From f6a08e2b4c63a325a74fabf67344f3179fbda6b3 Mon Sep 17 00:00:00 2001 From: Bas van Beek Date: Tue, 6 Mar 2018 11:49:29 +0100 Subject: [PATCH 01/11] Added new Zipkin tracing middleware using the native zipkin-go library --- examples/addsvc/cmd/addcli/addcli.go | 59 +++++--- examples/addsvc/cmd/addsvc/addsvc.go | 48 +++++-- examples/addsvc/cmd/addsvc/wiring_test.go | 6 +- examples/addsvc/pkg/addendpoint/set.go | 10 +- examples/addsvc/pkg/addtransport/grpc.go | 32 +++-- examples/addsvc/pkg/addtransport/http.go | 32 +++-- tracing/README.md | 50 ++++--- tracing/zipkin/README.md | 158 ++++++---------------- tracing/zipkin/doc.go | 5 + tracing/zipkin/endpoint.go | 51 +++++++ tracing/zipkin/endpoint_test.go | 125 +++++++++++++++++ tracing/zipkin/grpc.go | 40 ++++++ tracing/zipkin/grpc_test.go | 74 ++++++++++ tracing/zipkin/http.go | 52 +++++++ tracing/zipkin/http_test.go | 131 ++++++++++++++++++ 15 files changed, 689 insertions(+), 184 deletions(-) create mode 100644 tracing/zipkin/doc.go create mode 100644 tracing/zipkin/endpoint.go create mode 100644 tracing/zipkin/endpoint_test.go create mode 100644 tracing/zipkin/grpc.go create mode 100644 tracing/zipkin/grpc_test.go create mode 100644 tracing/zipkin/http.go create mode 100644 tracing/zipkin/http_test.go diff --git a/examples/addsvc/cmd/addcli/addcli.go b/examples/addsvc/cmd/addcli/addcli.go index 7e4aab4fb..4070cc703 100644 --- a/examples/addsvc/cmd/addcli/addcli.go +++ b/examples/addsvc/cmd/addcli/addcli.go @@ -14,7 +14,9 @@ import ( "github.com/apache/thrift/lib/go/thrift" lightstep "github.com/lightstep/lightstep-tracer-go" stdopentracing "github.com/opentracing/opentracing-go" - zipkin "github.com/openzipkin/zipkin-go-opentracing" + zipkin "github.com/openzipkin/zipkin-go" + zipkinot "github.com/openzipkin/zipkin-go-opentracing" + zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http" "sourcegraph.com/sourcegraph/appdash" appdashot "sourcegraph.com/sourcegraph/appdash/opentracing" @@ -41,9 +43,10 @@ func main() { thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson") thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered") thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing") - zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans") - lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token") - appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port") + zipkinV2URL = fs.String("zipkin-url", "", "Enable Zipkin v2 tracing (zipkin-go) via HTTP Reporter URL e.g. http://localhost:94111/api/v2/spans") + zipkinV1URL = fs.String("zipkin-v1-url", "", "Enable Zipkin v1 tracing (zipkin-go-opentracing) via a collector URL e.g. http://localhost:9411/api/v1/spans") + lightstepToken = fs.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token") + appdashAddr = fs.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port") method = fs.String("method", "sum", "sum, concat") ) fs.Usage = usageFor(fs, os.Args[0]+" [flags] ") @@ -55,10 +58,10 @@ func main() { // This is a demonstration client, which supports multiple tracers. // Your clients will probably just use one tracer. - var tracer stdopentracing.Tracer + var otTracer stdopentracing.Tracer { - if *zipkinURL != "" { - collector, err := zipkin.NewHTTPCollector(*zipkinURL) + if *zipkinV1URL != "" { + collector, err := zipkinot.NewHTTPCollector(*zipkinV1URL) if err != nil { fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) @@ -66,24 +69,44 @@ func main() { defer collector.Close() var ( debug = false - hostPort = "localhost:80" - serviceName = "addsvc" + hostPort = "localhost:0" + serviceName = "addsvc-cli" ) - recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName) - tracer, err = zipkin.NewTracer(recorder) + recorder := zipkinot.NewRecorder(collector, debug, hostPort, serviceName) + otTracer, err = zipkinot.NewTracer(recorder) if err != nil { fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) } } else if *lightstepToken != "" { - tracer = lightstep.NewTracer(lightstep.Options{ + otTracer = lightstep.NewTracer(lightstep.Options{ AccessToken: *lightstepToken, }) - defer lightstep.FlushLightStepTracer(tracer) + defer lightstep.FlushLightStepTracer(otTracer) } else if *appdashAddr != "" { - tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) + otTracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) } else { - tracer = stdopentracing.GlobalTracer() // no-op + otTracer = stdopentracing.GlobalTracer() // no-op + } + } + + var zipkinTracer *zipkin.Tracer + { + var ( + err error + hostPort = "" // if host:port is unknown we can keep this empty + serviceName = "addsvc-cli" + ) + noopTracer := (*zipkinV2URL == "") + reporter := zipkinhttp.NewReporter(*zipkinV2URL) + defer reporter.Close() + zEP, _ := zipkin.NewEndpoint(serviceName, hostPort) + zipkinTracer, err = zipkin.NewTracer( + reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(noopTracer), + ) + if err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) } } @@ -94,7 +117,7 @@ func main() { err error ) if *httpAddr != "" { - svc, err = addtransport.NewHTTPClient(*httpAddr, tracer, log.NewNopLogger()) + svc, err = addtransport.NewHTTPClient(*httpAddr, otTracer, zipkinTracer, log.NewNopLogger()) } else if *grpcAddr != "" { conn, err := grpc.Dial(*grpcAddr, grpc.WithInsecure(), grpc.WithTimeout(time.Second)) if err != nil { @@ -102,9 +125,9 @@ func main() { os.Exit(1) } defer conn.Close() - svc = addtransport.NewGRPCClient(conn, tracer, log.NewNopLogger()) + svc = addtransport.NewGRPCClient(conn, otTracer, zipkinTracer, log.NewNopLogger()) } else if *jsonRPCAddr != "" { - svc, err = addtransport.NewJSONRPCClient(*jsonRPCAddr, tracer, log.NewNopLogger()) + svc, err = addtransport.NewJSONRPCClient(*jsonRPCAddr, otTracer, log.NewNopLogger()) } else if *thriftAddr != "" { // It's necessary to do all of this construction in the func main, // because (among other reasons) we need to control the lifecycle of the diff --git a/examples/addsvc/cmd/addsvc/addsvc.go b/examples/addsvc/cmd/addsvc/addsvc.go index 71fe836b7..a0142a1a0 100644 --- a/examples/addsvc/cmd/addsvc/addsvc.go +++ b/examples/addsvc/cmd/addsvc/addsvc.go @@ -14,7 +14,9 @@ import ( lightstep "github.com/lightstep/lightstep-tracer-go" "github.com/oklog/oklog/pkg/group" stdopentracing "github.com/opentracing/opentracing-go" - zipkin "github.com/openzipkin/zipkin-go-opentracing" + zipkin "github.com/openzipkin/zipkin-go" + zipkinot "github.com/openzipkin/zipkin-go-opentracing" + zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http" stdprometheus "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" @@ -46,9 +48,10 @@ func main() { thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson") thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered") thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing") - zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans") - lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token") - appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port") + zipkinV2URL = fs.String("zipkin-url", "", "Enable Zipkin v2 tracing (zipkin-go) using a Reporter URL e.g. http://localhost:9411/api/v2/spans") + zipkinV1URL = fs.String("zipkin-v1-url", "", "Enable Zipkin v1 tracing (zipkin-go-opentracing) using a collector URL e.g. http://localhost:9411/api/v1/spans") + lightstepToken = fs.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token") + appdashAddr = fs.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port") ) fs.Usage = usageFor(fs, os.Args[0]+" [flags]") fs.Parse(os.Args[1:]) @@ -61,13 +64,13 @@ func main() { logger = log.With(logger, "caller", log.DefaultCaller) } - // Determine which tracer to use. We'll pass the tracer to all the + // Determine which OpenTracing tracer to use. We'll pass the tracer to all the // components that use it, as a dependency. var tracer stdopentracing.Tracer { - if *zipkinURL != "" { - logger.Log("tracer", "Zipkin", "URL", *zipkinURL) - collector, err := zipkin.NewHTTPCollector(*zipkinURL) + if *zipkinV1URL != "" { + logger.Log("tracer", "Zipkin", "URL", *zipkinV1URL) + collector, err := zipkinot.NewHTTPCollector(*zipkinV1URL) if err != nil { logger.Log("err", err) os.Exit(1) @@ -78,8 +81,8 @@ func main() { hostPort = "localhost:80" serviceName = "addsvc" ) - recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName) - tracer, err = zipkin.NewTracer(recorder) + recorder := zipkinot.NewRecorder(collector, debug, hostPort, serviceName) + tracer, err = zipkinot.NewTracer(recorder) if err != nil { logger.Log("err", err) os.Exit(1) @@ -99,6 +102,25 @@ func main() { } } + var zipkinTracer *zipkin.Tracer + { + var ( + err error + hostPort = "localhost:80" + serviceName = "addsvc" + ) + noopTracer := (*zipkinV2URL == "") + zEP, _ := zipkin.NewEndpoint(serviceName, hostPort) + reporter := zipkinhttp.NewReporter(*zipkinV2URL) + zipkinTracer, err = zipkin.NewTracer( + reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(noopTracer), + ) + if err != nil { + logger.Log("err", err) + os.Exit(1) + } + } + // Create the (sparse) metrics we'll use in the service. They, too, are // dependencies that we pass to components that use them. var ints, chars metrics.Counter @@ -137,9 +159,9 @@ func main() { // them to ports or anything yet; we'll do that next. var ( service = addservice.New(logger, ints, chars) - endpoints = addendpoint.New(service, logger, duration, tracer) - httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger) - grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger) + endpoints = addendpoint.New(service, logger, duration, tracer, zipkinTracer) + httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, zipkinTracer, logger) + grpcServer = addtransport.NewGRPCServer(endpoints, tracer, zipkinTracer, logger) thriftServer = addtransport.NewThriftServer(endpoints) jsonrpcHandler = addtransport.NewJSONRPCHandler(endpoints, logger) ) diff --git a/examples/addsvc/cmd/addsvc/wiring_test.go b/examples/addsvc/cmd/addsvc/wiring_test.go index ca64bac1f..f9b3551b2 100644 --- a/examples/addsvc/cmd/addsvc/wiring_test.go +++ b/examples/addsvc/cmd/addsvc/wiring_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/opentracing/opentracing-go" + zipkin "github.com/openzipkin/zipkin-go" "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics/discard" @@ -18,9 +19,10 @@ import ( ) func TestHTTP(t *testing.T) { + zkt, _ := zipkin.NewTracer(nil, zipkin.WithNoopTracer(true)) svc := addservice.New(log.NewNopLogger(), discard.NewCounter(), discard.NewCounter()) - eps := addendpoint.New(svc, log.NewNopLogger(), discard.NewHistogram(), opentracing.GlobalTracer()) - mux := addtransport.NewHTTPHandler(eps, opentracing.GlobalTracer(), log.NewNopLogger()) + eps := addendpoint.New(svc, log.NewNopLogger(), discard.NewHistogram(), opentracing.GlobalTracer(), zkt) + mux := addtransport.NewHTTPHandler(eps, opentracing.GlobalTracer(), zkt, log.NewNopLogger()) srv := httptest.NewServer(mux) defer srv.Close() diff --git a/examples/addsvc/pkg/addendpoint/set.go b/examples/addsvc/pkg/addendpoint/set.go index e4acaff47..b2c509142 100644 --- a/examples/addsvc/pkg/addendpoint/set.go +++ b/examples/addsvc/pkg/addendpoint/set.go @@ -7,6 +7,7 @@ import ( "golang.org/x/time/rate" stdopentracing "github.com/opentracing/opentracing-go" + stdzipkin "github.com/openzipkin/zipkin-go" "github.com/sony/gobreaker" "github.com/go-kit/kit/circuitbreaker" @@ -15,6 +16,7 @@ import ( "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/ratelimit" "github.com/go-kit/kit/tracing/opentracing" + "github.com/go-kit/kit/tracing/zipkin" "github.com/go-kit/kit/examples/addsvc/pkg/addservice" ) @@ -29,13 +31,14 @@ type Set struct { // New returns a Set that wraps the provided server, and wires in all of the // expected endpoint middlewares via the various parameters. -func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, trace stdopentracing.Tracer) Set { +func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer) Set { var sumEndpoint endpoint.Endpoint { sumEndpoint = MakeSumEndpoint(svc) sumEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 1))(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint) - sumEndpoint = opentracing.TraceServer(trace, "Sum")(sumEndpoint) + sumEndpoint = opentracing.TraceServer(otTracer, "Sum")(sumEndpoint) + sumEndpoint = zipkin.TraceServer(zipkinTracer, "Sum")(sumEndpoint) sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint) sumEndpoint = InstrumentingMiddleware(duration.With("method", "Sum"))(sumEndpoint) } @@ -44,7 +47,8 @@ func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, concatEndpoint = MakeConcatEndpoint(svc) concatEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint) - concatEndpoint = opentracing.TraceServer(trace, "Concat")(concatEndpoint) + concatEndpoint = opentracing.TraceServer(otTracer, "Concat")(concatEndpoint) + concatEndpoint = zipkin.TraceServer(zipkinTracer, "Concat")(concatEndpoint) concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint) concatEndpoint = InstrumentingMiddleware(duration.With("method", "Concat"))(concatEndpoint) } diff --git a/examples/addsvc/pkg/addtransport/grpc.go b/examples/addsvc/pkg/addtransport/grpc.go index 6ec58d7f4..18a52dc3d 100644 --- a/examples/addsvc/pkg/addtransport/grpc.go +++ b/examples/addsvc/pkg/addtransport/grpc.go @@ -8,6 +8,7 @@ import ( "google.golang.org/grpc" stdopentracing "github.com/opentracing/opentracing-go" + stdzipkin "github.com/openzipkin/zipkin-go" "github.com/sony/gobreaker" oldcontext "golang.org/x/net/context" "golang.org/x/time/rate" @@ -17,6 +18,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/ratelimit" "github.com/go-kit/kit/tracing/opentracing" + "github.com/go-kit/kit/tracing/zipkin" grpctransport "github.com/go-kit/kit/transport/grpc" "github.com/go-kit/kit/examples/addsvc/pb" @@ -30,7 +32,7 @@ type grpcServer struct { } // NewGRPCServer makes a set of endpoints available as a gRPC AddServer. -func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer { +func NewGRPCServer(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) pb.AddServer { options := []grpctransport.ServerOption{ grpctransport.ServerErrorLogger(logger), } @@ -39,13 +41,19 @@ func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logg endpoints.SumEndpoint, decodeGRPCSumRequest, encodeGRPCSumResponse, - append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(tracer, "Sum", logger)))..., + append(options, grpctransport.ServerBefore( + opentracing.GRPCToContext(otTracer, "Sum", logger), + zipkin.GRPCToContext(zipkinTracer, "Sum", logger), + ))..., ), concat: grpctransport.NewServer( endpoints.ConcatEndpoint, decodeGRPCConcatRequest, encodeGRPCConcatResponse, - append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(tracer, "Concat", logger)))..., + append(options, grpctransport.ServerBefore( + opentracing.GRPCToContext(otTracer, "Concat", logger), + zipkin.GRPCToContext(zipkinTracer, "Concat", logger), + ))..., ), } } @@ -70,7 +78,7 @@ func (s *grpcServer) Concat(ctx oldcontext.Context, req *pb.ConcatRequest) (*pb. // of the conn. The caller is responsible for constructing the conn, and // eventually closing the underlying transport. We bake-in certain middlewares, // implementing the client library pattern. -func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addservice.Service { +func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) addservice.Service { // We construct a single ratelimiter middleware, to limit the total outgoing // QPS from this client to all methods on the remote instance. We also // construct per-endpoint circuitbreaker middlewares to demonstrate how @@ -91,9 +99,13 @@ func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger l encodeGRPCSumRequest, decodeGRPCSumResponse, pb.SumReply{}, - grpctransport.ClientBefore(opentracing.ContextToGRPC(tracer, logger)), + grpctransport.ClientBefore( + opentracing.ContextToGRPC(otTracer, logger), + zipkin.ContextToGRPC(zipkinTracer, logger), + ), ).Endpoint() - sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) + sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint) + sumEndpoint = zipkin.TraceClient(zipkinTracer, "Sum")(sumEndpoint) sumEndpoint = limiter(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Sum", @@ -112,9 +124,13 @@ func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger l encodeGRPCConcatRequest, decodeGRPCConcatResponse, pb.ConcatReply{}, - grpctransport.ClientBefore(opentracing.ContextToGRPC(tracer, logger)), + grpctransport.ClientBefore( + opentracing.ContextToGRPC(otTracer, logger), + zipkin.ContextToGRPC(zipkinTracer, logger), + ), ).Endpoint() - concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) + concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint) + concatEndpoint = zipkin.TraceClient(zipkinTracer, "Concat")(concatEndpoint) concatEndpoint = limiter(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Concat", diff --git a/examples/addsvc/pkg/addtransport/http.go b/examples/addsvc/pkg/addtransport/http.go index 3819c6d87..ebc5cc74e 100644 --- a/examples/addsvc/pkg/addtransport/http.go +++ b/examples/addsvc/pkg/addtransport/http.go @@ -14,6 +14,7 @@ import ( "golang.org/x/time/rate" stdopentracing "github.com/opentracing/opentracing-go" + stdzipkin "github.com/openzipkin/zipkin-go" "github.com/sony/gobreaker" "github.com/go-kit/kit/circuitbreaker" @@ -21,6 +22,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/ratelimit" "github.com/go-kit/kit/tracing/opentracing" + "github.com/go-kit/kit/tracing/zipkin" httptransport "github.com/go-kit/kit/transport/http" "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" @@ -29,7 +31,7 @@ import ( // NewHTTPHandler returns an HTTP handler that makes a set of endpoints // available on predefined paths. -func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) http.Handler { +func NewHTTPHandler(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) http.Handler { options := []httptransport.ServerOption{ httptransport.ServerErrorEncoder(errorEncoder), httptransport.ServerErrorLogger(logger), @@ -39,13 +41,19 @@ func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, log endpoints.SumEndpoint, decodeHTTPSumRequest, encodeHTTPGenericResponse, - append(options, httptransport.ServerBefore(opentracing.HTTPToContext(tracer, "Sum", logger)))..., + append(options, httptransport.ServerBefore( + opentracing.HTTPToContext(otTracer, "Sum", logger), + zipkin.HTTPToContext(zipkinTracer, "Sum", logger), + ))..., )) m.Handle("/concat", httptransport.NewServer( endpoints.ConcatEndpoint, decodeHTTPConcatRequest, encodeHTTPGenericResponse, - append(options, httptransport.ServerBefore(opentracing.HTTPToContext(tracer, "Concat", logger)))..., + append(options, httptransport.ServerBefore( + opentracing.HTTPToContext(otTracer, "Concat", logger), + zipkin.HTTPToContext(zipkinTracer, "Concat", logger), + ))..., )) return m } @@ -54,7 +62,7 @@ func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, log // remote instance. We expect instance to come from a service discovery system, // so likely of the form "host:port". We bake-in certain middlewares, // implementing the client library pattern. -func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) { +func NewHTTPClient(instance string, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) (addservice.Service, error) { // Quickly sanitize the instance string. if !strings.HasPrefix(instance, "http") { instance = "http://" + instance @@ -82,9 +90,13 @@ func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Log copyURL(u, "/sum"), encodeHTTPGenericRequest, decodeHTTPSumResponse, - httptransport.ClientBefore(opentracing.ContextToHTTP(tracer, logger)), + httptransport.ClientBefore( + opentracing.ContextToHTTP(otTracer, logger), + zipkin.ContextToHTTP(zipkinTracer, logger), + ), ).Endpoint() - sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) + sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint) + sumEndpoint = zipkin.TraceClient(zipkinTracer, "Sum")(sumEndpoint) sumEndpoint = limiter(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Sum", @@ -101,9 +113,13 @@ func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Log copyURL(u, "/concat"), encodeHTTPGenericRequest, decodeHTTPConcatResponse, - httptransport.ClientBefore(opentracing.ContextToHTTP(tracer, logger)), + httptransport.ClientBefore( + opentracing.ContextToHTTP(otTracer, logger), + zipkin.ContextToHTTP(zipkinTracer, logger), + ), ).Endpoint() - concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) + concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint) + concatEndpoint = zipkin.TraceClient(zipkinTracer, "Concat")(concatEndpoint) concatEndpoint = limiter(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Concat", diff --git a/tracing/README.md b/tracing/README.md index d5e60e826..87242e4cc 100644 --- a/tracing/README.md +++ b/tracing/README.md @@ -1,6 +1,6 @@ # package tracing -`package tracing` provides [Dapper][]-style request tracing to services. +`package tracing` provides [Dapper]-style request tracing to services. ## Rationale @@ -10,37 +10,47 @@ hot spots, and diagnosing errors. All microservice infrastructures will benefit from request tracing; sufficiently large infrastructures will require it. -## OpenTracing +## Zipkin -Go kit builds on top of the [OpenTracing] API and uses the [opentracing-go] -package to provide tracing middlewares for its servers and clients. Currently -`kit/transport/http` and `kit/transport/grpc` transports are supported. +[Zipkin] is the most used OSS distributed tracing platform available with +support for many different languages and frameworks. Go kit provides bindings +to the native Go tracing implementation [zipkin-go]. If using Zipkin with Go +kit in a polyglot microservices environment, this is the preferred binding to +use. Instrumentation exists for `kit/transport/http` and `kit/transport/grpc`. +The bindings are highlighted in the [addsvc] example. For more information +regarding Zipkin feel free to visit [Zipkin's Gitter]. -Since [OpenTracing] is an upcoming standard API, Go kit should support a -multitude of tracing backends. If a Tracer implementation in Go for your -back-end exists, it should work out of the box. The following tracing back-ends -are known to work with Go kit through the OpenTracing interface and are -highlighted in the [addsvc] example. +## OpenTracing +Go kit supports the [OpenTracing] API and uses the [opentracing-go] package to +provide tracing middlewares for its servers and clients. Currently OpenTracing +instrumentation exists for `kit/transport/http` and `kit/transport/grpc`. -### LightStep +Since [OpenTracing] is an effort to provide a generic API, Go kit should support +a multitude of tracing backends. If a Tracer implementation or OpenTracing +bridge in Go for your back-end exists, it should work out of the box. -[LightStep] support is available through their standard Go package -[lightstep-tracer-go]. +Please note that the "world view" of existing tracing systems do differ. +OpenTracing can not guarantee you that tracing alignment is perfect in a +polyglot microservice environment or switching from one tracing backend to +another truly entails just a change in configuration. + +The following tracing back-ends are known to work with Go kit through the +OpenTracing interface and are highlighted in the [addsvc] example. ### AppDash [Appdash] support is available straight from their system repository in the [appdash/opentracing] directory. +### LightStep + +[LightStep] support is available through their standard Go package +[lightstep-tracer-go]. + ### Zipkin -[Zipkin] support is now available from the [zipkin-go-opentracing] package which -can be found at the [Open Zipkin GitHub] page. This means our old custom -`tracing/zipkin` package is now deprecated. In the `kit/tracing/zipkin` -directory you can still find the `docker-compose` script to bootstrap a Zipkin -development environment and a [README] detailing how to transition from the -old package to the new. +[Zipkin] support is available through the [zipkin-go-opentracing] package. [Dapper]: http://research.google.com/pubs/pub36356.html [addsvc]:https://github.com/go-kit/kit/tree/master/examples/addsvc @@ -52,6 +62,8 @@ old package to the new. [Zipkin]: http://zipkin.io/ [Open Zipkin GitHub]: https://github.com/openzipkin [zipkin-go-opentracing]: https://github.com/openzipkin/zipkin-go-opentracing +[zipkin-go]: https://github.com/openzipkin/zipkin-go +[Zipkin's Gitter]: https://gitter.im/openzipkin/zipkin [Appdash]: https://github.com/sourcegraph/appdash [appdash/opentracing]: https://github.com/sourcegraph/appdash/tree/master/opentracing diff --git a/tracing/zipkin/README.md b/tracing/zipkin/README.md index 36579b576..25549acd8 100644 --- a/tracing/zipkin/README.md +++ b/tracing/zipkin/README.md @@ -25,149 +25,81 @@ Collector. Follow the [addsvc] example to check out how to wire the Zipkin Middleware. The changes should be relatively minor. -The [zipkin-go-opentracing] package has support for HTTP, Kafka and Scribe -collectors as well as using Go Kit's [Log] package for logging. +The [zipkin-go] package has Reporters to send Spans to the Zipkin +HTTP and Kafka Collectors. -### Configuring for the Zipkin HTTP Collector +### Configuring the Zipkin HTTP Reporter -To select the transport for the HTTP Collector, you configure the `Recorder` -with the appropriate collector like this: +To use the HTTP Reporter with a Zipkin instance running on localhost you +bootstrap zipkin-go like this: ```go var ( - debugMode = false serviceName = "MyService" serviceHostPort = "localhost:8000" - zipkinHTTPEndpoint = "localhost:9411" + zipkinHTTPEndpoint = "http://localhost:9411/api/v2/spans" ) -collector, err = zipkin.NewHTTPCollector(zipkinHTTPEndpoint) -if err != nil { - // handle error -} -tracer, err = zipkin.NewTracer( - zipkin.NewRecorder(collector, debugMode, serviceHostPort, serviceName), - ... -) -``` - -### Span per Node vs. Span per RPC -By default Zipkin V1 considers either side of an RPC to have the same identity -and differs in that respect from many other tracing systems which consider the -caller to be the parent and the receiver to be the child. The OpenTracing -specification does not dictate one model over the other, but the Zipkin team is -looking into these [single-host-spans] to potentially bring Zipkin more in-line -with the other tracing systems. - -[single-host-spans]: https://github.com/openzipkin/zipkin/issues/963 - -In case of a `span per node` the receiver will create a child span from the -propagated parent span like this: - -``` -Span per Node propagation and identities - -CALLER: RECEIVER: ---------------------------------- -traceId -> traceId - spanId (new) -spanId -> parentSpanId -parentSpanId -``` -**Note:** most tracing implementations supporting the `span per node` model -therefore do not propagate their `parentSpanID` as its not needed. +// create an instance of the HTTP Reporter. +reporter := zipkin.NewReporter(zipkinHTTPEndpoint) -A typical Zipkin implementation will use the `span per RPC` model and recreate -the span identity from the caller on the receiver's end and then annotates its -values on top of it. Propagation will happen like this: +// create our tracer's local endpoint (how the service is identified in Zipkin). +localEndpoint, _ := zipkin.NewEndpoint(serviceName, serviceHostPort) -``` -Span per RPC propagation and identities - -CALLER: RECEIVER: ---------------------------------- -traceId -> traceId -spanId -> spanId -parentSpanId -> parentSpanId -``` - -The [zipkin-go-opentracing] implementation allows you to choose which model you -wish to use. Make sure you select the same model consistently for all your -services that are required to communicate with each other or you will have trace -propagation issues. If using non OpenTracing / legacy instrumentation, it's -probably best to use the `span per RPC call` model. - -To adhere to the more common tracing philosophy of `span per node`, the Tracer -defaults to `span per node`. To set the `span per RPC call` mode start your -tracer like this: +// create our tracer instance. +tracer, err = zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(localEndpoint)) + ... -```go -tracer, err = zipkin.NewTracer( - zipkin.NewRecorder(...), - zipkin.ClientServerSameSpan(true), -) ``` -[zipkin-go-opentracing]: https://github.com/openzipkin/zipkin-go-opentracing +[zipkin-go]: https://github.com/openzipkin/zipkin-go [addsvc]:https://github.com/go-kit/kit/tree/master/examples/addsvc [Log]: https://github.com/go-kit/kit/tree/master/log ### Tracing Resources -In our legacy implementation we had the `NewChildSpan` method to allow -annotation of resources such as databases, caches and other services that do not -have server side tracing support. Since OpenTracing has no specific method of -dealing with these items explicitely that is compatible with Zipkin's `SA` -annotation, the [zipkin-go-opentracing] has implemented support using the -OpenTracing Tags system. Here is an example of how one would be able to record -a resource span compatible with standard OpenTracing and triggering an `SA` -annotation in [zipkin-go-opentracing]: - +Here is an example of how you could trace resources and work with local spans. ```go -// you need to import the ext package for the Tag helper functions import ( - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" + zipkin "github.com/openzipkin/zipkin-go" ) func (svc *Service) GetMeSomeExamples(ctx context.Context, ...) ([]Examples, error) { - // Example of annotating a database query: - var ( - serviceName = "MySQL" - serviceHost = "mysql.example.com" - servicePort = uint16(3306) - queryLabel = "GetExamplesByParam" - query = "select * from example where param = 'value'" - ) - - // retrieve the parent span, if not found create a new trace - parentSpan := opentracing.SpanFromContext(ctx) - if parentSpan == nil { - parentSpan = opentracing.StartSpan(queryLabel) - defer parentSpan.Finish() - } - - // create a new span to record the resource interaction - span := opentracing.StartChildSpan(parentSpan, queryLabel) - - // span.kind "resource" triggers SA annotation - ext.SpanKind.Set(span, "resource") - - // this will label the span's service & hostPort (called Endpoint in Zipkin) - ext.PeerService.Set(span, serviceName) - ext.PeerHostname.Set(span, serviceHost) - ext.PeerPort.Set(span, servicePort) - - // a Tag is the equivalent of a Zipkin Binary Annotation (key:value pair) + // Example of annotating a database query: + var ( + spanContext model.SpanContext + serviceName = "MySQL" + serviceHost = "mysql.example.com:3306" + queryLabel = "GetExamplesByParam" + query = "select * from example where param = :value" + ) + + // retrieve the parent span from context to use as parent, if not found we + // start a new trace + if parentSpan := zipkin.SpanFromContext(ctx); parentSpan != nil { + spanContext = parentSpan.Context() + } + + // create the remote Zipkin endpoint + ep, _ := zipkin.NewEndpoint(serviceName, serviceHost) + + // create a new span to record the resource interaction + span := zipkin.StartSpan( + queryLabel, + zipkin.Parent(parentSpan.Context()), + zipkin.WithRemoteEndpoint(ep), + ) + + // add interesting key/value pair to our span span.SetTag("query", query) - // a LogEvent is the equivalent of a Zipkin Annotation (timestamped) - span.LogEvent("query:start") + // add interesting timed event to our span + span.Annotate(time.Now(), "query:start") // do the actual query... // let's annotate the end... - span.LogEvent("query:end") + span.Annotate(time.Now(), "query:end") // we're done with this span. span.Finish() diff --git a/tracing/zipkin/doc.go b/tracing/zipkin/doc.go new file mode 100644 index 000000000..82f6d6c57 --- /dev/null +++ b/tracing/zipkin/doc.go @@ -0,0 +1,5 @@ +// Package zipkin provides Go kit integration to the OpenZipkin project through +// the use of zipkin-go, the official OpenZipkin tracer implementation for Go. +// OpenZipkin is the most used open source distributed tracing ecosystem with +// many different libraries and interoperability options. +package zipkin diff --git a/tracing/zipkin/endpoint.go b/tracing/zipkin/endpoint.go new file mode 100644 index 000000000..324de66c1 --- /dev/null +++ b/tracing/zipkin/endpoint.go @@ -0,0 +1,51 @@ +package zipkin + +import ( + "context" + + "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/model" + + "github.com/go-kit/kit/endpoint" +) + +// TraceServer returns a Middleware that wraps the `next` Endpoint in a Zipkin +// Span called `operationName`. +// +// If `ctx` already has a Span, it is re-used and the operation name is +// overwritten. If `ctx` does not yet have a Span, one is created here. +func TraceServer(tracer *zipkin.Tracer, operationName string) endpoint.Middleware { + return func(next endpoint.Endpoint) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + var sp zipkin.Span + // try to retrieve Span from Go context, create new Span if not found. + if sp = zipkin.SpanFromContext(ctx); sp == nil { + sp = tracer.StartSpan(operationName, zipkin.Kind(model.Server)) + ctx = zipkin.NewContext(ctx, sp) + } else { + sp.SetName(operationName) + } + defer sp.Finish() + return next(ctx, request) + } + } +} + +// TraceClient returns a Middleware that wraps the `next` Endpoint in a Zipkin +// Span called `operationName`. +func TraceClient(tracer *zipkin.Tracer, operationName string) endpoint.Middleware { + return func(next endpoint.Endpoint) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + var spanOpts = []zipkin.SpanOption{zipkin.Kind(model.Client)} + // try to retrieve Span from Go context, use its SpanContext if found. + if parentSpan := zipkin.SpanFromContext(ctx); parentSpan != nil { + spanOpts = append(spanOpts, zipkin.Parent(parentSpan.Context())) + } + // create new client span (if sc is empty, Parent is a noop) + sp := tracer.StartSpan(operationName, spanOpts...) + defer sp.Finish() + ctx = zipkin.NewContext(ctx, sp) + return next(ctx, request) + } + } +} diff --git a/tracing/zipkin/endpoint_test.go b/tracing/zipkin/endpoint_test.go new file mode 100644 index 000000000..b3a817b08 --- /dev/null +++ b/tracing/zipkin/endpoint_test.go @@ -0,0 +1,125 @@ +package zipkin_test + +import ( + "context" + "testing" + + "github.com/go-kit/kit/endpoint" + zipkin "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/reporter/recorder" + + kitzipkin "github.com/go-kit/kit/tracing/zipkin" +) + +func TestTraceServer(t *testing.T) { + reporter := recorder.NewReporter() + defer reporter.Close() + tracer, _ := zipkin.NewTracer(reporter) + + // Initialize the ctx with a nameless Span. + contextSpan := tracer.StartSpan("") + ctx := zipkin.NewContext(context.Background(), contextSpan) + + tracedEndpoint := kitzipkin.TraceServer(tracer, "testOp")(endpoint.Nop) + if _, err := tracedEndpoint(ctx, struct{}{}); err != nil { + t.Fatal(err) + } + + finishedSpans := reporter.Flush() + if want, have := 1, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + // Test that the op name is updated + endpointSpan := finishedSpans[0] + if want, have := "testOp", endpointSpan.Name; want != have { + t.Fatalf("Want %q, have %q", want, have) + } + contextContext := contextSpan.Context() + endpointContext := endpointSpan.SpanContext + // ...and that the ID is unmodified. + if want, have := contextContext.ID, endpointContext.ID; want != have { + t.Errorf("Want SpanID %q, have %q", want, have) + } +} + +func TestTraceServerNoContextSpan(t *testing.T) { + reporter := recorder.NewReporter() + defer reporter.Close() + tracer, _ := zipkin.NewTracer(reporter) + + // Empty/background context. + tracedEndpoint := kitzipkin.TraceServer(tracer, "testOp")(endpoint.Nop) + if _, err := tracedEndpoint(context.Background(), struct{}{}); err != nil { + t.Fatal(err) + } + + // tracedEndpoint created a new Span. + finishedSpans := reporter.Flush() + if want, have := 1, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + endpointSpan := finishedSpans[0] + if want, have := "testOp", endpointSpan.Name; want != have { + t.Fatalf("Want %q, have %q", want, have) + } +} + +func TestTraceClient(t *testing.T) { + reporter := recorder.NewReporter() + defer reporter.Close() + tracer, _ := zipkin.NewTracer(reporter) + + // Initialize the ctx with a parent Span. + parentSpan := tracer.StartSpan("parent") + defer parentSpan.Finish() + ctx := zipkin.NewContext(context.Background(), parentSpan) + + tracedEndpoint := kitzipkin.TraceClient(tracer, "testOp")(endpoint.Nop) + if _, err := tracedEndpoint(ctx, struct{}{}); err != nil { + t.Fatal(err) + } + + // tracedEndpoint created a new Span. + finishedSpans := reporter.Flush() + if want, have := 1, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + endpointSpan := finishedSpans[0] + if want, have := "testOp", endpointSpan.Name; want != have { + t.Fatalf("Want %q, have %q", want, have) + } + + parentContext := parentSpan.Context() + endpointContext := parentSpan.Context() + + // ... and that the parent ID is set appropriately. + if want, have := parentContext.ID, endpointContext.ID; want != have { + t.Errorf("Want ParentID %q, have %q", want, have) + } +} + +func TestTraceClientNoContextSpan(t *testing.T) { + reporter := recorder.NewReporter() + defer reporter.Close() + tracer, _ := zipkin.NewTracer(reporter) + + // Empty/background context. + tracedEndpoint := kitzipkin.TraceClient(tracer, "testOp")(endpoint.Nop) + if _, err := tracedEndpoint(context.Background(), struct{}{}); err != nil { + t.Fatal(err) + } + + // tracedEndpoint created a new Span. + finishedSpans := reporter.Flush() + if want, have := 1, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + endpointSpan := finishedSpans[0] + if want, have := "testOp", endpointSpan.Name; want != have { + t.Fatalf("Want %q, have %q", want, have) + } +} diff --git a/tracing/zipkin/grpc.go b/tracing/zipkin/grpc.go new file mode 100644 index 000000000..18a66937f --- /dev/null +++ b/tracing/zipkin/grpc.go @@ -0,0 +1,40 @@ +package zipkin + +import ( + "context" + + zipkin "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/model" + "github.com/openzipkin/zipkin-go/propagation/b3" + "google.golang.org/grpc/metadata" + + "github.com/go-kit/kit/log" +) + +// ContextToGRPC returns a grpc RequestFunc that injects a Zipkin Span found in +// `ctx` into the grpc Metadata. If no such Span can be found, the RequestFunc +// is a noop. +func ContextToGRPC(tracer *zipkin.Tracer, logger log.Logger) func(context.Context, *metadata.MD) context.Context { + return func(ctx context.Context, md *metadata.MD) context.Context { + if span := zipkin.SpanFromContext(ctx); span != nil { + // There's nothing we can do with an error here. + if err := b3.InjectGRPC(md)(span.Context()); err != nil { + logger.Log("err", err) + } + } + return ctx + } +} + +// GRPCToContext returns a grpc RequestFunc that tries to join with a Zipkin +// trace found in `req` and starts a new Span called `operationName` +// accordingly. If no trace could be found in `req`, the Span +// will be a trace root. The Span is incorporated in the returned Context and +// can be retrieved with zipkin.SpanFromContext(ctx). +func GRPCToContext(tracer *zipkin.Tracer, operationName string, logger log.Logger) func(ctx context.Context, md metadata.MD) context.Context { + return func(ctx context.Context, md metadata.MD) context.Context { + spanContext := tracer.Extract(b3.ExtractGRPC(&md)) + span := tracer.StartSpan(operationName, zipkin.Kind(model.Server), zipkin.Parent(spanContext)) + return zipkin.NewContext(ctx, span) + } +} diff --git a/tracing/zipkin/grpc_test.go b/tracing/zipkin/grpc_test.go new file mode 100644 index 000000000..3b2326f71 --- /dev/null +++ b/tracing/zipkin/grpc_test.go @@ -0,0 +1,74 @@ +package zipkin_test + +import ( + "context" + "testing" + + zipkin "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/reporter/recorder" + "google.golang.org/grpc/metadata" + + "github.com/go-kit/kit/log" + kitzipkin "github.com/go-kit/kit/tracing/zipkin" +) + +func TestTraceGRPCRequestRoundtrip(t *testing.T) { + logger := log.NewNopLogger() + reporter := recorder.NewReporter() + defer reporter.Close() + + // we disable shared rpc spans so we can test parent-child relationship + tracer, _ := zipkin.NewTracer(reporter, zipkin.WithSharedSpans(false)) + + // Initialize the ctx with a Span to inject. + beforeSpan := tracer.StartSpan("to_inject") + beforeCtx := zipkin.NewContext(context.Background(), beforeSpan) + + toGRPCFunc := kitzipkin.ContextToGRPC(tracer, logger) + md := metadata.Pairs() + // Call the RequestFunc. + afterCtx := toGRPCFunc(beforeCtx, &md) + + // The Span should not have changed. + afterSpan := zipkin.SpanFromContext(afterCtx) + if beforeSpan != afterSpan { + t.Error("Should not swap in a new span") + } + + // No spans should have finished yet. + finishedSpans := reporter.Flush() + if want, have := 0, len(finishedSpans); want != have { + t.Errorf("Want %v span(s), found %v", want, have) + } + + // Use GRPCToContext to verify that we can join with the trace given MD. + fromGRPCFunc := kitzipkin.GRPCToContext(tracer, "joined", logger) + joinCtx := fromGRPCFunc(afterCtx, md) + joinedSpan := zipkin.SpanFromContext(joinCtx) + + joinedSpan.Finish() + beforeSpan.Finish() + + finishedSpans = reporter.Flush() + if want, have := 2, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + joined := finishedSpans[0] + before := finishedSpans[1] + + if joined.SpanContext.ID == before.SpanContext.ID { + t.Error("Span.ID should have changed", joined.SpanContext.ID, before.SpanContext.ID) + } + + // Check that the parent/child relationship is as expected for the joined span. + if joined.SpanContext.ParentID == nil { + t.Fatalf("Want ParentID %q, have nil", before.SpanContext.ID) + } + if want, have := before.SpanContext.ID, *joined.SpanContext.ParentID; want != have { + t.Errorf("Want ParentID %q, have %q", want, have) + } + if want, have := "joined", joined.Name; want != have { + t.Errorf("Want %q, have %q", want, have) + } +} diff --git a/tracing/zipkin/http.go b/tracing/zipkin/http.go new file mode 100644 index 000000000..c1cc0a197 --- /dev/null +++ b/tracing/zipkin/http.go @@ -0,0 +1,52 @@ +package zipkin + +import ( + "context" + "net/http" + + zipkin "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/model" + "github.com/openzipkin/zipkin-go/propagation/b3" + + "github.com/go-kit/kit/log" + kithttp "github.com/go-kit/kit/transport/http" +) + +// ContextToHTTP returns an http RequestFunc that injects a Zipkin Span found +// in `ctx` into the http headers. If no such Span can be found, the RequestFunc +// is a noop. +func ContextToHTTP(tracer *zipkin.Tracer, logger log.Logger) kithttp.RequestFunc { + return func(ctx context.Context, req *http.Request) context.Context { + if span := zipkin.SpanFromContext(ctx); span != nil { + // add some common Zipkin Tags + zipkin.TagHTTPMethod.Set(span, req.Method) + zipkin.TagHTTPUrl.Set(span, req.URL.String()) + if endpoint, err := zipkin.NewEndpoint("", req.URL.Host); err == nil { + span.SetRemoteEndpoint(endpoint) + } + // There's nothing we can do with any errors here. + if err := b3.InjectHTTP(req)(span.Context()); err != nil { + logger.Log("err", err) + } + } + return ctx + } +} + +// HTTPToContext returns an http RequestFunc that tries to join with a Zipkin +// trace found in `req` and starts a new Span called `operationName` +// accordingly. If no trace could be found in `req`, the Span will be a trace +// root. The Span is incorporated in the returned Context and can be retrieved +// with zipkin.SpanFromContext(ctx). +func HTTPToContext(tracer *zipkin.Tracer, operationName string, logger log.Logger) kithttp.RequestFunc { + return func(ctx context.Context, req *http.Request) context.Context { + spanContext := tracer.Extract(b3.ExtractHTTP(req)) + span := tracer.StartSpan( + operationName, zipkin.Kind(model.Server), zipkin.Parent(spanContext), + ) + // add some common Zipkin Tags + zipkin.TagHTTPMethod.Set(span, req.Method) + zipkin.TagHTTPUrl.Set(span, req.URL.String()) + return zipkin.NewContext(ctx, span) + } +} diff --git a/tracing/zipkin/http_test.go b/tracing/zipkin/http_test.go new file mode 100644 index 000000000..99afacfdc --- /dev/null +++ b/tracing/zipkin/http_test.go @@ -0,0 +1,131 @@ +package zipkin_test + +import ( + "context" + "net/http" + "reflect" + "testing" + + zipkin "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/propagation/b3" + "github.com/openzipkin/zipkin-go/reporter/recorder" + + "github.com/go-kit/kit/log" + kitzipkin "github.com/go-kit/kit/tracing/zipkin" +) + +func TestTraceHTTPRequestRoundtrip(t *testing.T) { + logger := log.NewNopLogger() + reporter := recorder.NewReporter() + defer reporter.Close() + + // we disable shared rpc spans so we can test parent-child relationship + tracer, _ := zipkin.NewTracer(reporter, zipkin.WithSharedSpans(false)) + + // Initialize the ctx with a Span to inject. + beforeSpan := tracer.StartSpan("to_inject") + beforeCtx := zipkin.NewContext(context.Background(), beforeSpan) + + toHTTPFunc := kitzipkin.ContextToHTTP(tracer, logger) + req, _ := http.NewRequest("GET", "http://test.biz/path", nil) + // Call the RequestFunc. + afterCtx := toHTTPFunc(beforeCtx, req) + + // The Span should not have changed. + afterSpan := zipkin.SpanFromContext(afterCtx) + if beforeSpan != afterSpan { + t.Error("Should not swap in a new span") + } + + // No spans should have finished yet. + finishedSpans := reporter.Flush() + if want, have := 0, len(finishedSpans); want != have { + t.Errorf("Want %v span(s), found %v", want, have) + } + + // Use HTTPToContext to verify that we can join with the trace given a req. + fromHTTPFunc := kitzipkin.HTTPToContext(tracer, "joined", logger) + joinCtx := fromHTTPFunc(afterCtx, req) + joinedSpan := zipkin.SpanFromContext(joinCtx) + + joinedSpan.Finish() + beforeSpan.Finish() + + finishedSpans = reporter.Flush() + if want, have := 2, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + joined := finishedSpans[0] + before := finishedSpans[1] + + if joined.SpanContext.ID == before.SpanContext.ID { + t.Error("SpanID should have changed", joined.SpanContext.ID, before.SpanContext.ID) + } + + // Check that the parent/child relationship is as expected for the joined span. + if joined.SpanContext.ParentID == nil { + t.Fatalf("Want ParentID %q, have nil", before.SpanContext.ID) + } + if want, have := before.SpanContext.ID, *joined.SpanContext.ParentID; want != have { + t.Errorf("Want ParentID %q, have %q", want, have) + } + if want, have := "joined", joined.Name; want != have { + t.Errorf("Want %q, have %q", want, have) + } +} + +func TestContextToHTTPTags(t *testing.T) { + reporter := recorder.NewReporter() + defer reporter.Close() + tracer, _ := zipkin.NewTracer(reporter) + + span := tracer.StartSpan("to_inject") + ctx := zipkin.NewContext(context.Background(), span) + req, _ := http.NewRequest("GET", "http://test.biz/path", nil) + + kitzipkin.ContextToHTTP(tracer, log.NewNopLogger())(ctx, req) + + expectedTags := map[string]string{ + string(zipkin.TagHTTPMethod): "GET", + string(zipkin.TagHTTPUrl): "http://test.biz/path", + } + + span.Finish() + + finishedSpans := reporter.Flush() + if want, have := 1, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + if !reflect.DeepEqual(expectedTags, finishedSpans[0].Tags) { + t.Errorf("Want %q, have %q", expectedTags, finishedSpans[0].Tags) + } +} + +func TestHTTPToContextTags(t *testing.T) { + reporter := recorder.NewReporter() + defer reporter.Close() + tracer, _ := zipkin.NewTracer(reporter) + + parentSpan := tracer.StartSpan("to_extract") + defer parentSpan.Finish() + + req, _ := http.NewRequest("GET", "http://test.biz/path", nil) + b3.InjectHTTP(req)(parentSpan.Context()) + + ctx := kitzipkin.HTTPToContext(tracer, "op", log.NewNopLogger())(context.Background(), req) + zipkin.SpanFromContext(ctx).Finish() + + childSpan := reporter.Flush()[0] + expectedTags := map[string]string{ + string(zipkin.TagHTTPMethod): "GET", + string(zipkin.TagHTTPUrl): "http://test.biz/path", + } + if !reflect.DeepEqual(expectedTags, childSpan.Tags) { + t.Errorf("Want %q, have %q", expectedTags, childSpan.Tags) + } + if want, have := "op", childSpan.Name; want != have { + t.Errorf("Want %q, have %q", want, have) + } +} From c75b3618f52af0721c1acaa309f298b239e2f738 Mon Sep 17 00:00:00 2001 From: Bas van Beek Date: Tue, 9 Jan 2018 20:42:15 +0100 Subject: [PATCH 02/11] new approach to Zipkin tracing middleware for Go kit --- examples/addsvc/cmd/addsvc/addsvc.go | 5 +- examples/addsvc/pkg/addendpoint/set.go | 4 +- examples/addsvc/pkg/addtransport/grpc.go | 52 ++++-- examples/addsvc/pkg/addtransport/http.go | 44 +++-- examples/apigateway/main.go | 12 +- tracing/zipkin/endpoint.go | 39 +---- tracing/zipkin/endpoint_test.go | 125 -------------- tracing/zipkin/grpc.go | 195 ++++++++++++++++++--- tracing/zipkin/grpc_test.go | 74 -------- tracing/zipkin/http.go | 206 +++++++++++++++++++---- tracing/zipkin/http_test.go | 131 -------------- tracing/zipkin/options.go | 54 ++++++ transport/grpc/client.go | 35 +++- transport/grpc/request_response_funcs.go | 6 + transport/grpc/server.go | 58 +++++-- transport/http/client.go | 10 +- transport/http/server.go | 10 +- 17 files changed, 578 insertions(+), 482 deletions(-) delete mode 100644 tracing/zipkin/endpoint_test.go delete mode 100644 tracing/zipkin/grpc_test.go delete mode 100644 tracing/zipkin/http_test.go create mode 100644 tracing/zipkin/options.go diff --git a/examples/addsvc/cmd/addsvc/addsvc.go b/examples/addsvc/cmd/addsvc/addsvc.go index a0142a1a0..c34c45062 100644 --- a/examples/addsvc/cmd/addsvc/addsvc.go +++ b/examples/addsvc/cmd/addsvc/addsvc.go @@ -26,6 +26,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/prometheus" + kitgrpc "github.com/go-kit/kit/transport/grpc" addpb "github.com/go-kit/kit/examples/addsvc/pb" "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" @@ -218,7 +219,9 @@ func main() { } g.Add(func() error { logger.Log("transport", "gRPC", "addr", *grpcAddr) - baseServer := grpc.NewServer() + // we add the Go Kit gRPC Interceptor to our gRPC service as it is used by + // the here demonstrated zipkin tracing middleware. + baseServer := grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor)) addpb.RegisterAddServer(baseServer, grpcServer) return baseServer.Serve(grpcListener) }, func(error) { diff --git a/examples/addsvc/pkg/addendpoint/set.go b/examples/addsvc/pkg/addendpoint/set.go index b2c509142..8e42d6f31 100644 --- a/examples/addsvc/pkg/addendpoint/set.go +++ b/examples/addsvc/pkg/addendpoint/set.go @@ -38,7 +38,7 @@ func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, sumEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 1))(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint) sumEndpoint = opentracing.TraceServer(otTracer, "Sum")(sumEndpoint) - sumEndpoint = zipkin.TraceServer(zipkinTracer, "Sum")(sumEndpoint) + sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint) sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint) sumEndpoint = InstrumentingMiddleware(duration.With("method", "Sum"))(sumEndpoint) } @@ -48,7 +48,7 @@ func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, concatEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint) concatEndpoint = opentracing.TraceServer(otTracer, "Concat")(concatEndpoint) - concatEndpoint = zipkin.TraceServer(zipkinTracer, "Concat")(concatEndpoint) + concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint) concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint) concatEndpoint = InstrumentingMiddleware(duration.With("method", "Concat"))(concatEndpoint) } diff --git a/examples/addsvc/pkg/addtransport/grpc.go b/examples/addsvc/pkg/addtransport/grpc.go index 18a52dc3d..6f54219ed 100644 --- a/examples/addsvc/pkg/addtransport/grpc.go +++ b/examples/addsvc/pkg/addtransport/grpc.go @@ -33,27 +33,31 @@ type grpcServer struct { // NewGRPCServer makes a set of endpoints available as a gRPC AddServer. func NewGRPCServer(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) pb.AddServer { + // Zipkin GRPC Server Trace can either be instantiated per endpoint with a + // provided operation name or a global tracing service can be instantiated + // without an operation name and fed to each Go kit endpoint as ServerOption. + // In the latter case, the operation name will be the endpoint's grpc method + // path. + // We demonstrate a global tracing service here. + zipkinServer := zipkin.GRPCServerTrace(zipkinTracer) + options := []grpctransport.ServerOption{ grpctransport.ServerErrorLogger(logger), + zipkinServer, } + return &grpcServer{ sum: grpctransport.NewServer( endpoints.SumEndpoint, decodeGRPCSumRequest, encodeGRPCSumResponse, - append(options, grpctransport.ServerBefore( - opentracing.GRPCToContext(otTracer, "Sum", logger), - zipkin.GRPCToContext(zipkinTracer, "Sum", logger), - ))..., + append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(otTracer, "Sum", logger)))..., ), concat: grpctransport.NewServer( endpoints.ConcatEndpoint, decodeGRPCConcatRequest, encodeGRPCConcatResponse, - append(options, grpctransport.ServerBefore( - opentracing.GRPCToContext(otTracer, "Concat", logger), - zipkin.GRPCToContext(zipkinTracer, "Concat", logger), - ))..., + append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(otTracer, "Concat", logger)))..., ), } } @@ -86,6 +90,18 @@ func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkin // for the entire remote instance, too. limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100)) + // Zipkin GRPC Client Trace can either be instantiated per endpoint with a + // provided operation name or a global tracing client can be instantiated + // without an operation name and fed to each Go kit endpoint as ClientOption. + // In the latter case, the operation name will be the endpoint's grpc method + // path. + zipkinClient := zipkin.GRPCClientTrace(zipkinTracer) + + // global client middlewares + options := []grpctransport.ClientOption{ + zipkinClient, + } + // Each individual endpoint is an http/transport.Client (which implements // endpoint.Endpoint) that gets wrapped with various middlewares. If you // made your own client library, you'd do this work there, so your server @@ -99,13 +115,13 @@ func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkin encodeGRPCSumRequest, decodeGRPCSumResponse, pb.SumReply{}, - grpctransport.ClientBefore( - opentracing.ContextToGRPC(otTracer, logger), - zipkin.ContextToGRPC(zipkinTracer, logger), - ), + append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))..., ).Endpoint() sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint) - sumEndpoint = zipkin.TraceClient(zipkinTracer, "Sum")(sumEndpoint) + // For additional information TraceEndpoint is added as endpoint middleware. + // If instantiating per endpoint ClientTracers on the Go kit gRPC client, + // you might not want this additional middleware and thus could be omitted. + sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint) sumEndpoint = limiter(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Sum", @@ -124,13 +140,13 @@ func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkin encodeGRPCConcatRequest, decodeGRPCConcatResponse, pb.ConcatReply{}, - grpctransport.ClientBefore( - opentracing.ContextToGRPC(otTracer, logger), - zipkin.ContextToGRPC(zipkinTracer, logger), - ), + append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))..., ).Endpoint() concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint) - concatEndpoint = zipkin.TraceClient(zipkinTracer, "Concat")(concatEndpoint) + // For additional information TraceEndpoint is added as endpoint middleware. + // If instantiating per endpoint ClientTracers on the Go kit gRPC client, + // you might not want this additional middleware and thus could be omitted. + concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint) concatEndpoint = limiter(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Concat", diff --git a/examples/addsvc/pkg/addtransport/http.go b/examples/addsvc/pkg/addtransport/http.go index ebc5cc74e..0eb4e7358 100644 --- a/examples/addsvc/pkg/addtransport/http.go +++ b/examples/addsvc/pkg/addtransport/http.go @@ -32,28 +32,31 @@ import ( // NewHTTPHandler returns an HTTP handler that makes a set of endpoints // available on predefined paths. func NewHTTPHandler(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) http.Handler { + // Zipkin HTTP Server Trace can either be instantiated per endpoint with a + // provided operation name or a global tracing service can be instantiated + // without an operation name and fed to each Go kit endpoint as ServerOption. + // In the latter case, the operation name will be the endpoint's http method. + // We demonstrate a global tracing service here. + zipkinServer := zipkin.HTTPServerTrace(zipkinTracer) + options := []httptransport.ServerOption{ httptransport.ServerErrorEncoder(errorEncoder), httptransport.ServerErrorLogger(logger), + zipkinServer, } + m := http.NewServeMux() m.Handle("/sum", httptransport.NewServer( endpoints.SumEndpoint, decodeHTTPSumRequest, encodeHTTPGenericResponse, - append(options, httptransport.ServerBefore( - opentracing.HTTPToContext(otTracer, "Sum", logger), - zipkin.HTTPToContext(zipkinTracer, "Sum", logger), - ))..., + append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Sum", logger)))..., )) m.Handle("/concat", httptransport.NewServer( endpoints.ConcatEndpoint, decodeHTTPConcatRequest, encodeHTTPGenericResponse, - append(options, httptransport.ServerBefore( - opentracing.HTTPToContext(otTracer, "Concat", logger), - zipkin.HTTPToContext(zipkinTracer, "Concat", logger), - ))..., + append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Concat", logger)))..., )) return m } @@ -79,6 +82,17 @@ func NewHTTPClient(instance string, otTracer stdopentracing.Tracer, zipkinTracer // for the entire remote instance, too. limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100)) + // Zipkin HTTP Client Trace can either be instantiated per endpoint with a + // provided operation name or a global tracing client can be instantiated + // without an operation name and fed to each Go kit endpoint as ClientOption. + // In the latter case, the operation name will be the endpoint's http method. + zipkinClient := zipkin.HTTPClientTrace(zipkinTracer) + + // global client middlewares + options := []httptransport.ClientOption{ + zipkinClient, + } + // Each individual endpoint is an http/transport.Client (which implements // endpoint.Endpoint) that gets wrapped with various middlewares. If you // made your own client library, you'd do this work there, so your server @@ -90,13 +104,10 @@ func NewHTTPClient(instance string, otTracer stdopentracing.Tracer, zipkinTracer copyURL(u, "/sum"), encodeHTTPGenericRequest, decodeHTTPSumResponse, - httptransport.ClientBefore( - opentracing.ContextToHTTP(otTracer, logger), - zipkin.ContextToHTTP(zipkinTracer, logger), - ), + append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))..., ).Endpoint() sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint) - sumEndpoint = zipkin.TraceClient(zipkinTracer, "Sum")(sumEndpoint) + sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint) sumEndpoint = limiter(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Sum", @@ -113,13 +124,10 @@ func NewHTTPClient(instance string, otTracer stdopentracing.Tracer, zipkinTracer copyURL(u, "/concat"), encodeHTTPGenericRequest, decodeHTTPConcatResponse, - httptransport.ClientBefore( - opentracing.ContextToHTTP(otTracer, logger), - zipkin.ContextToHTTP(zipkinTracer, logger), - ), + append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))..., ).Endpoint() concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint) - concatEndpoint = zipkin.TraceClient(zipkinTracer, "Concat")(concatEndpoint) + concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint) concatEndpoint = limiter(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Concat", diff --git a/examples/apigateway/main.go b/examples/apigateway/main.go index 3b4cd675e..5891241d1 100644 --- a/examples/apigateway/main.go +++ b/examples/apigateway/main.go @@ -20,6 +20,7 @@ import ( "github.com/gorilla/mux" "github.com/hashicorp/consul/api" stdopentracing "github.com/opentracing/opentracing-go" + stdzipkin "github.com/openzipkin/zipkin-go" "google.golang.org/grpc" "github.com/go-kit/kit/endpoint" @@ -67,6 +68,7 @@ func main() { // Transport domain. tracer := stdopentracing.GlobalTracer() // no-op + zipkinTracer, _ := stdzipkin.NewTracer(nil, stdzipkin.WithNoopTracer(true)) ctx := context.Background() r := mux.NewRouter() @@ -88,14 +90,14 @@ func main() { instancer = consulsd.NewInstancer(client, logger, "addsvc", tags, passingOnly) ) { - factory := addsvcFactory(addendpoint.MakeSumEndpoint, tracer, logger) + factory := addsvcFactory(addendpoint.MakeSumEndpoint, tracer, zipkinTracer, logger) endpointer := sd.NewEndpointer(instancer, factory, logger) balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(*retryMax, *retryTimeout, balancer) endpoints.SumEndpoint = retry } { - factory := addsvcFactory(addendpoint.MakeConcatEndpoint, tracer, logger) + factory := addsvcFactory(addendpoint.MakeConcatEndpoint, tracer, zipkinTracer, logger) endpointer := sd.NewEndpointer(instancer, factory, logger) balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(*retryMax, *retryTimeout, balancer) @@ -106,7 +108,7 @@ func main() { // HTTP handler, and just install it under a particular path prefix in // our router. - r.PathPrefix("/addsvc").Handler(http.StripPrefix("/addsvc", addtransport.NewHTTPHandler(endpoints, tracer, logger))) + r.PathPrefix("/addsvc").Handler(http.StripPrefix("/addsvc", addtransport.NewHTTPHandler(endpoints, tracer, zipkinTracer, logger))) } // stringsvc routes. @@ -165,7 +167,7 @@ func main() { logger.Log("exit", <-errc) } -func addsvcFactory(makeEndpoint func(addservice.Service) endpoint.Endpoint, tracer stdopentracing.Tracer, logger log.Logger) sd.Factory { +func addsvcFactory(makeEndpoint func(addservice.Service) endpoint.Endpoint, tracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) sd.Factory { return func(instance string) (endpoint.Endpoint, io.Closer, error) { // We could just as easily use the HTTP or Thrift client package to make // the connection to addsvc. We've chosen gRPC arbitrarily. Note that @@ -176,7 +178,7 @@ func addsvcFactory(makeEndpoint func(addservice.Service) endpoint.Endpoint, trac if err != nil { return nil, nil, err } - service := addtransport.NewGRPCClient(conn, tracer, logger) + service := addtransport.NewGRPCClient(conn, tracer, zipkinTracer, logger) endpoint := makeEndpoint(service) // Notice that the addsvc gRPC client converts the connection to a diff --git a/tracing/zipkin/endpoint.go b/tracing/zipkin/endpoint.go index 324de66c1..e004bf24f 100644 --- a/tracing/zipkin/endpoint.go +++ b/tracing/zipkin/endpoint.go @@ -9,41 +9,20 @@ import ( "github.com/go-kit/kit/endpoint" ) -// TraceServer returns a Middleware that wraps the `next` Endpoint in a Zipkin -// Span called `operationName`. -// -// If `ctx` already has a Span, it is re-used and the operation name is -// overwritten. If `ctx` does not yet have a Span, one is created here. -func TraceServer(tracer *zipkin.Tracer, operationName string) endpoint.Middleware { +// TraceEndpoint returns an Endpoint middleware, tracing a Go kit endpoint. +// This endpoint tracer should be used in combination with a Go kit Transport +// tracing middleware or custom before and after transport functions as +// propagation of SpanContext is not provided in this middleware. +func TraceEndpoint(tracer *zipkin.Tracer, name string) endpoint.Middleware { return func(next endpoint.Endpoint) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { - var sp zipkin.Span - // try to retrieve Span from Go context, create new Span if not found. - if sp = zipkin.SpanFromContext(ctx); sp == nil { - sp = tracer.StartSpan(operationName, zipkin.Kind(model.Server)) - ctx = zipkin.NewContext(ctx, sp) - } else { - sp.SetName(operationName) - } - defer sp.Finish() - return next(ctx, request) - } - } -} - -// TraceClient returns a Middleware that wraps the `next` Endpoint in a Zipkin -// Span called `operationName`. -func TraceClient(tracer *zipkin.Tracer, operationName string) endpoint.Middleware { - return func(next endpoint.Endpoint) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - var spanOpts = []zipkin.SpanOption{zipkin.Kind(model.Client)} - // try to retrieve Span from Go context, use its SpanContext if found. + var sc model.SpanContext if parentSpan := zipkin.SpanFromContext(ctx); parentSpan != nil { - spanOpts = append(spanOpts, zipkin.Parent(parentSpan.Context())) + sc = parentSpan.Context() } - // create new client span (if sc is empty, Parent is a noop) - sp := tracer.StartSpan(operationName, spanOpts...) + sp := tracer.StartSpan(name, zipkin.Parent(sc)) defer sp.Finish() + ctx = zipkin.NewContext(ctx, sp) return next(ctx, request) } diff --git a/tracing/zipkin/endpoint_test.go b/tracing/zipkin/endpoint_test.go deleted file mode 100644 index b3a817b08..000000000 --- a/tracing/zipkin/endpoint_test.go +++ /dev/null @@ -1,125 +0,0 @@ -package zipkin_test - -import ( - "context" - "testing" - - "github.com/go-kit/kit/endpoint" - zipkin "github.com/openzipkin/zipkin-go" - "github.com/openzipkin/zipkin-go/reporter/recorder" - - kitzipkin "github.com/go-kit/kit/tracing/zipkin" -) - -func TestTraceServer(t *testing.T) { - reporter := recorder.NewReporter() - defer reporter.Close() - tracer, _ := zipkin.NewTracer(reporter) - - // Initialize the ctx with a nameless Span. - contextSpan := tracer.StartSpan("") - ctx := zipkin.NewContext(context.Background(), contextSpan) - - tracedEndpoint := kitzipkin.TraceServer(tracer, "testOp")(endpoint.Nop) - if _, err := tracedEndpoint(ctx, struct{}{}); err != nil { - t.Fatal(err) - } - - finishedSpans := reporter.Flush() - if want, have := 1, len(finishedSpans); want != have { - t.Fatalf("Want %v span(s), found %v", want, have) - } - - // Test that the op name is updated - endpointSpan := finishedSpans[0] - if want, have := "testOp", endpointSpan.Name; want != have { - t.Fatalf("Want %q, have %q", want, have) - } - contextContext := contextSpan.Context() - endpointContext := endpointSpan.SpanContext - // ...and that the ID is unmodified. - if want, have := contextContext.ID, endpointContext.ID; want != have { - t.Errorf("Want SpanID %q, have %q", want, have) - } -} - -func TestTraceServerNoContextSpan(t *testing.T) { - reporter := recorder.NewReporter() - defer reporter.Close() - tracer, _ := zipkin.NewTracer(reporter) - - // Empty/background context. - tracedEndpoint := kitzipkin.TraceServer(tracer, "testOp")(endpoint.Nop) - if _, err := tracedEndpoint(context.Background(), struct{}{}); err != nil { - t.Fatal(err) - } - - // tracedEndpoint created a new Span. - finishedSpans := reporter.Flush() - if want, have := 1, len(finishedSpans); want != have { - t.Fatalf("Want %v span(s), found %v", want, have) - } - - endpointSpan := finishedSpans[0] - if want, have := "testOp", endpointSpan.Name; want != have { - t.Fatalf("Want %q, have %q", want, have) - } -} - -func TestTraceClient(t *testing.T) { - reporter := recorder.NewReporter() - defer reporter.Close() - tracer, _ := zipkin.NewTracer(reporter) - - // Initialize the ctx with a parent Span. - parentSpan := tracer.StartSpan("parent") - defer parentSpan.Finish() - ctx := zipkin.NewContext(context.Background(), parentSpan) - - tracedEndpoint := kitzipkin.TraceClient(tracer, "testOp")(endpoint.Nop) - if _, err := tracedEndpoint(ctx, struct{}{}); err != nil { - t.Fatal(err) - } - - // tracedEndpoint created a new Span. - finishedSpans := reporter.Flush() - if want, have := 1, len(finishedSpans); want != have { - t.Fatalf("Want %v span(s), found %v", want, have) - } - - endpointSpan := finishedSpans[0] - if want, have := "testOp", endpointSpan.Name; want != have { - t.Fatalf("Want %q, have %q", want, have) - } - - parentContext := parentSpan.Context() - endpointContext := parentSpan.Context() - - // ... and that the parent ID is set appropriately. - if want, have := parentContext.ID, endpointContext.ID; want != have { - t.Errorf("Want ParentID %q, have %q", want, have) - } -} - -func TestTraceClientNoContextSpan(t *testing.T) { - reporter := recorder.NewReporter() - defer reporter.Close() - tracer, _ := zipkin.NewTracer(reporter) - - // Empty/background context. - tracedEndpoint := kitzipkin.TraceClient(tracer, "testOp")(endpoint.Nop) - if _, err := tracedEndpoint(context.Background(), struct{}{}); err != nil { - t.Fatal(err) - } - - // tracedEndpoint created a new Span. - finishedSpans := reporter.Flush() - if want, have := 1, len(finishedSpans); want != have { - t.Fatalf("Want %v span(s), found %v", want, have) - } - - endpointSpan := finishedSpans[0] - if want, have := "testOp", endpointSpan.Name; want != have { - t.Fatalf("Want %q, have %q", want, have) - } -} diff --git a/tracing/zipkin/grpc.go b/tracing/zipkin/grpc.go index 18a66937f..e582ad0e7 100644 --- a/tracing/zipkin/grpc.go +++ b/tracing/zipkin/grpc.go @@ -2,39 +2,190 @@ package zipkin import ( "context" + "strconv" zipkin "github.com/openzipkin/zipkin-go" "github.com/openzipkin/zipkin-go/model" "github.com/openzipkin/zipkin-go/propagation/b3" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "github.com/go-kit/kit/log" + kitgrpc "github.com/go-kit/kit/transport/grpc" ) -// ContextToGRPC returns a grpc RequestFunc that injects a Zipkin Span found in -// `ctx` into the grpc Metadata. If no such Span can be found, the RequestFunc -// is a noop. -func ContextToGRPC(tracer *zipkin.Tracer, logger log.Logger) func(context.Context, *metadata.MD) context.Context { - return func(ctx context.Context, md *metadata.MD) context.Context { - if span := zipkin.SpanFromContext(ctx); span != nil { - // There's nothing we can do with an error here. - if err := b3.InjectGRPC(md)(span.Context()); err != nil { - logger.Log("err", err) - } - } - return ctx +// GRPCClientTrace enables Zipkin tracing of a Go kit gRPC Client Transport. +func GRPCClientTrace(tracer *zipkin.Tracer, options ...Option) kitgrpc.ClientOption { + config := tracerOptions{ + tags: make(map[string]string), + name: "", + logger: log.NewNopLogger(), + propagate: true, } + + for _, option := range options { + option(&config) + } + + clientBefore := kitgrpc.ClientBefore( + func(ctx context.Context, md *metadata.MD) context.Context { + var ( + spanContext model.SpanContext + name string + ) + + if config.name != "" { + name = config.name + } else { + name = ctx.Value(kitgrpc.ContextKeyRequestMethod).(string) + } + + if parent := zipkin.SpanFromContext(ctx); parent != nil { + spanContext = parent.Context() + } + + span := tracer.StartSpan( + name, + zipkin.Kind(model.Client), + zipkin.Tags(config.tags), + zipkin.Parent(spanContext), + zipkin.FlushOnFinish(false), + ) + + if config.propagate { + if err := b3.InjectGRPC(md)(span.Context()); err != nil { + config.logger.Log("err", err) + } + } + + return zipkin.NewContext(ctx, span) + }, + ) + + clientAfter := kitgrpc.ClientAfter( + func(ctx context.Context, _ metadata.MD, _ metadata.MD) context.Context { + if span := zipkin.SpanFromContext(ctx); span != nil { + span.Finish() + } + + return ctx + }, + ) + + clientFinalizer := kitgrpc.ClientFinalizer( + func(ctx context.Context, err error) { + if span := zipkin.SpanFromContext(ctx); span != nil { + if err != nil { + zipkin.TagError.Set(span, err.Error()) + } + // calling span.Finish() a second time is a noop, if we didn't get to + // ClientAfter we can at least time the early bail out by calling it + // here. + span.Finish() + // send span to the Reporter + span.Flush() + } + }, + ) + + return func(c *kitgrpc.Client) { + clientBefore(c) + clientAfter(c) + clientFinalizer(c) + } + } -// GRPCToContext returns a grpc RequestFunc that tries to join with a Zipkin -// trace found in `req` and starts a new Span called `operationName` -// accordingly. If no trace could be found in `req`, the Span -// will be a trace root. The Span is incorporated in the returned Context and -// can be retrieved with zipkin.SpanFromContext(ctx). -func GRPCToContext(tracer *zipkin.Tracer, operationName string, logger log.Logger) func(ctx context.Context, md metadata.MD) context.Context { - return func(ctx context.Context, md metadata.MD) context.Context { - spanContext := tracer.Extract(b3.ExtractGRPC(&md)) - span := tracer.StartSpan(operationName, zipkin.Kind(model.Server), zipkin.Parent(spanContext)) - return zipkin.NewContext(ctx, span) +// GRPCServerTrace enables Zipkin tracing of a Go kit gRPC Server Transport. +func GRPCServerTrace(tracer *zipkin.Tracer, options ...Option) kitgrpc.ServerOption { + config := tracerOptions{ + tags: make(map[string]string), + name: "", + logger: log.NewNopLogger(), + propagate: true, + } + + for _, option := range options { + option(&config) + } + + serverBefore := kitgrpc.ServerBefore( + func(ctx context.Context, md metadata.MD) context.Context { + var ( + spanContext model.SpanContext + name string + tags = make(map[string]string) + ) + + rpcMethod, ok := ctx.Value(kitgrpc.ContextKeyRequestMethod).(string) + if !ok { + config.logger.Log("unable to retrieve method name: missing gRPC interceptor hook") + } else { + tags["grpc.method"] = rpcMethod + } + + if config.name != "" { + name = config.name + } else { + name = rpcMethod + } + + if config.propagate { + spanContext = tracer.Extract(b3.ExtractGRPC(&md)) + if spanContext.Err != nil { + config.logger.Log("err", spanContext.Err) + } + } + + span := tracer.StartSpan( + name, + zipkin.Kind(model.Server), + zipkin.Tags(config.tags), + zipkin.Tags(tags), + zipkin.Parent(spanContext), + zipkin.FlushOnFinish(false), + ) + + return zipkin.NewContext(ctx, span) + }, + ) + + serverAfter := kitgrpc.ServerAfter( + func(ctx context.Context, _ *metadata.MD, _ *metadata.MD) context.Context { + if span := zipkin.SpanFromContext(ctx); span != nil { + span.Finish() + } + + return ctx + }, + ) + + serverFinalizer := kitgrpc.ServerFinalizer( + func(ctx context.Context, err error) { + if span := zipkin.SpanFromContext(ctx); span != nil { + if err != nil { + if status, ok := status.FromError(err); ok { + statusCode := strconv.FormatUint(uint64(status.Code()), 10) + zipkin.TagGRPCStatusCode.Set(span, statusCode) + zipkin.TagError.Set(span, status.Message()) + } else { + zipkin.TagError.Set(span, err.Error()) + } + } + + // calling span.Finish() a second time is a noop, if we didn't get to + // ServerAfter we can at least time the early bail out by calling it + // here. + span.Finish() + // send span to the Reporter + span.Flush() + } + }, + ) + + return func(s *kitgrpc.Server) { + serverBefore(s) + serverAfter(s) + serverFinalizer(s) } } diff --git a/tracing/zipkin/grpc_test.go b/tracing/zipkin/grpc_test.go deleted file mode 100644 index 3b2326f71..000000000 --- a/tracing/zipkin/grpc_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package zipkin_test - -import ( - "context" - "testing" - - zipkin "github.com/openzipkin/zipkin-go" - "github.com/openzipkin/zipkin-go/reporter/recorder" - "google.golang.org/grpc/metadata" - - "github.com/go-kit/kit/log" - kitzipkin "github.com/go-kit/kit/tracing/zipkin" -) - -func TestTraceGRPCRequestRoundtrip(t *testing.T) { - logger := log.NewNopLogger() - reporter := recorder.NewReporter() - defer reporter.Close() - - // we disable shared rpc spans so we can test parent-child relationship - tracer, _ := zipkin.NewTracer(reporter, zipkin.WithSharedSpans(false)) - - // Initialize the ctx with a Span to inject. - beforeSpan := tracer.StartSpan("to_inject") - beforeCtx := zipkin.NewContext(context.Background(), beforeSpan) - - toGRPCFunc := kitzipkin.ContextToGRPC(tracer, logger) - md := metadata.Pairs() - // Call the RequestFunc. - afterCtx := toGRPCFunc(beforeCtx, &md) - - // The Span should not have changed. - afterSpan := zipkin.SpanFromContext(afterCtx) - if beforeSpan != afterSpan { - t.Error("Should not swap in a new span") - } - - // No spans should have finished yet. - finishedSpans := reporter.Flush() - if want, have := 0, len(finishedSpans); want != have { - t.Errorf("Want %v span(s), found %v", want, have) - } - - // Use GRPCToContext to verify that we can join with the trace given MD. - fromGRPCFunc := kitzipkin.GRPCToContext(tracer, "joined", logger) - joinCtx := fromGRPCFunc(afterCtx, md) - joinedSpan := zipkin.SpanFromContext(joinCtx) - - joinedSpan.Finish() - beforeSpan.Finish() - - finishedSpans = reporter.Flush() - if want, have := 2, len(finishedSpans); want != have { - t.Fatalf("Want %v span(s), found %v", want, have) - } - - joined := finishedSpans[0] - before := finishedSpans[1] - - if joined.SpanContext.ID == before.SpanContext.ID { - t.Error("Span.ID should have changed", joined.SpanContext.ID, before.SpanContext.ID) - } - - // Check that the parent/child relationship is as expected for the joined span. - if joined.SpanContext.ParentID == nil { - t.Fatalf("Want ParentID %q, have nil", before.SpanContext.ID) - } - if want, have := before.SpanContext.ID, *joined.SpanContext.ParentID; want != have { - t.Errorf("Want ParentID %q, have %q", want, have) - } - if want, have := "joined", joined.Name; want != have { - t.Errorf("Want %q, have %q", want, have) - } -} diff --git a/tracing/zipkin/http.go b/tracing/zipkin/http.go index c1cc0a197..d17f39d56 100644 --- a/tracing/zipkin/http.go +++ b/tracing/zipkin/http.go @@ -3,6 +3,7 @@ package zipkin import ( "context" "net/http" + "strconv" zipkin "github.com/openzipkin/zipkin-go" "github.com/openzipkin/zipkin-go/model" @@ -12,41 +13,180 @@ import ( kithttp "github.com/go-kit/kit/transport/http" ) -// ContextToHTTP returns an http RequestFunc that injects a Zipkin Span found -// in `ctx` into the http headers. If no such Span can be found, the RequestFunc -// is a noop. -func ContextToHTTP(tracer *zipkin.Tracer, logger log.Logger) kithttp.RequestFunc { - return func(ctx context.Context, req *http.Request) context.Context { - if span := zipkin.SpanFromContext(ctx); span != nil { - // add some common Zipkin Tags - zipkin.TagHTTPMethod.Set(span, req.Method) - zipkin.TagHTTPUrl.Set(span, req.URL.String()) - if endpoint, err := zipkin.NewEndpoint("", req.URL.Host); err == nil { - span.SetRemoteEndpoint(endpoint) - } - // There's nothing we can do with any errors here. - if err := b3.InjectHTTP(req)(span.Context()); err != nil { - logger.Log("err", err) - } - } - return ctx +// HTTPClientTrace enables Zipkin tracing of a Go kit HTTP Client Transport. +func HTTPClientTrace(tracer *zipkin.Tracer, options ...Option) kithttp.ClientOption { + config := tracerOptions{ + tags: make(map[string]string), + name: "", + logger: log.NewNopLogger(), + propagate: true, + } + + for _, option := range options { + option(&config) + } + + clientBefore := kithttp.ClientBefore( + func(ctx context.Context, req *http.Request) context.Context { + var ( + spanContext model.SpanContext + name string + ) + + if config.name != "" { + name = config.name + } else { + name = req.Method + } + + if parent := zipkin.SpanFromContext(ctx); parent != nil { + spanContext = parent.Context() + } + + tags := map[string]string{ + string(zipkin.TagHTTPMethod): req.Method, + string(zipkin.TagHTTPUrl): req.URL.String(), + } + + span := tracer.StartSpan( + name, + zipkin.Kind(model.Client), + zipkin.Tags(config.tags), + zipkin.Tags(tags), + zipkin.Parent(spanContext), + zipkin.FlushOnFinish(false), + ) + + if config.propagate { + if err := b3.InjectHTTP(req)(span.Context()); err != nil { + config.logger.Log("err", err) + } + } + + return zipkin.NewContext(ctx, span) + }, + ) + + clientAfter := kithttp.ClientAfter( + func(ctx context.Context, res *http.Response) context.Context { + if span := zipkin.SpanFromContext(ctx); span != nil { + zipkin.TagHTTPResponseSize.Set(span, strconv.FormatInt(res.ContentLength, 10)) + zipkin.TagHTTPStatusCode.Set(span, strconv.Itoa(res.StatusCode)) + if res.StatusCode > 399 { + zipkin.TagError.Set(span, strconv.Itoa(res.StatusCode)) + } + span.Finish() + } + + return ctx + }, + ) + + clientFinalizer := kithttp.ClientFinalizer( + func(ctx context.Context, err error) { + if span := zipkin.SpanFromContext(ctx); span != nil { + if err != nil { + zipkin.TagError.Set(span, err.Error()) + } + // calling span.Finish() a second time is a noop, if we didn't get to + // ClientAfter we can at least time the early bail out by calling it + // here. + span.Finish() + // send span to the Reporter + span.Flush() + } + }, + ) + + return func(c *kithttp.Client) { + clientBefore(c) + clientAfter(c) + clientFinalizer(c) } } -// HTTPToContext returns an http RequestFunc that tries to join with a Zipkin -// trace found in `req` and starts a new Span called `operationName` -// accordingly. If no trace could be found in `req`, the Span will be a trace -// root. The Span is incorporated in the returned Context and can be retrieved -// with zipkin.SpanFromContext(ctx). -func HTTPToContext(tracer *zipkin.Tracer, operationName string, logger log.Logger) kithttp.RequestFunc { - return func(ctx context.Context, req *http.Request) context.Context { - spanContext := tracer.Extract(b3.ExtractHTTP(req)) - span := tracer.StartSpan( - operationName, zipkin.Kind(model.Server), zipkin.Parent(spanContext), - ) - // add some common Zipkin Tags - zipkin.TagHTTPMethod.Set(span, req.Method) - zipkin.TagHTTPUrl.Set(span, req.URL.String()) - return zipkin.NewContext(ctx, span) +// HTTPServerTrace enables Zipkin tracing of a Go kit HTTP Server Transport. +func HTTPServerTrace(tracer *zipkin.Tracer, options ...Option) kithttp.ServerOption { + config := tracerOptions{ + tags: make(map[string]string), + name: "", + logger: log.NewNopLogger(), + propagate: true, + } + + for _, option := range options { + option(&config) + } + + serverBefore := kithttp.ServerBefore( + func(ctx context.Context, req *http.Request) context.Context { + var ( + spanContext model.SpanContext + name string + ) + + if config.name != "" { + name = config.name + } else { + name = req.Method + } + + if config.propagate { + spanContext = tracer.Extract(b3.ExtractHTTP(req)) + if spanContext.Err != nil { + config.logger.Log("err", spanContext.Err) + } + } + + tags := map[string]string{ + string(zipkin.TagHTTPMethod): req.Method, + string(zipkin.TagHTTPPath): req.URL.Path, + } + + span := tracer.StartSpan( + name, + zipkin.Kind(model.Server), + zipkin.Tags(config.tags), + zipkin.Tags(tags), + zipkin.Parent(spanContext), + zipkin.FlushOnFinish(false), + ) + + return zipkin.NewContext(ctx, span) + }, + ) + + serverAfter := kithttp.ServerAfter( + func(ctx context.Context, _ http.ResponseWriter) context.Context { + if span := zipkin.SpanFromContext(ctx); span != nil { + span.Finish() + } + + return ctx + }, + ) + + serverFinalizer := kithttp.ServerFinalizer( + func(ctx context.Context, code int, r *http.Request) { + if span := zipkin.SpanFromContext(ctx); span != nil { + zipkin.TagHTTPStatusCode.Set(span, strconv.Itoa(code)) + if rs, ok := ctx.Value(kithttp.ContextKeyResponseSize).(int64); ok { + zipkin.TagHTTPResponseSize.Set(span, strconv.FormatInt(rs, 10)) + } + + // calling span.Finish() a second time is a noop, if we didn't get to + // ServerAfter we can at least time the early bail out by calling it + // here. + span.Finish() + // send span to the Reporter + span.Flush() + } + }, + ) + + return func(s *kithttp.Server) { + serverBefore(s) + serverAfter(s) + serverFinalizer(s) } } diff --git a/tracing/zipkin/http_test.go b/tracing/zipkin/http_test.go deleted file mode 100644 index 99afacfdc..000000000 --- a/tracing/zipkin/http_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package zipkin_test - -import ( - "context" - "net/http" - "reflect" - "testing" - - zipkin "github.com/openzipkin/zipkin-go" - "github.com/openzipkin/zipkin-go/propagation/b3" - "github.com/openzipkin/zipkin-go/reporter/recorder" - - "github.com/go-kit/kit/log" - kitzipkin "github.com/go-kit/kit/tracing/zipkin" -) - -func TestTraceHTTPRequestRoundtrip(t *testing.T) { - logger := log.NewNopLogger() - reporter := recorder.NewReporter() - defer reporter.Close() - - // we disable shared rpc spans so we can test parent-child relationship - tracer, _ := zipkin.NewTracer(reporter, zipkin.WithSharedSpans(false)) - - // Initialize the ctx with a Span to inject. - beforeSpan := tracer.StartSpan("to_inject") - beforeCtx := zipkin.NewContext(context.Background(), beforeSpan) - - toHTTPFunc := kitzipkin.ContextToHTTP(tracer, logger) - req, _ := http.NewRequest("GET", "http://test.biz/path", nil) - // Call the RequestFunc. - afterCtx := toHTTPFunc(beforeCtx, req) - - // The Span should not have changed. - afterSpan := zipkin.SpanFromContext(afterCtx) - if beforeSpan != afterSpan { - t.Error("Should not swap in a new span") - } - - // No spans should have finished yet. - finishedSpans := reporter.Flush() - if want, have := 0, len(finishedSpans); want != have { - t.Errorf("Want %v span(s), found %v", want, have) - } - - // Use HTTPToContext to verify that we can join with the trace given a req. - fromHTTPFunc := kitzipkin.HTTPToContext(tracer, "joined", logger) - joinCtx := fromHTTPFunc(afterCtx, req) - joinedSpan := zipkin.SpanFromContext(joinCtx) - - joinedSpan.Finish() - beforeSpan.Finish() - - finishedSpans = reporter.Flush() - if want, have := 2, len(finishedSpans); want != have { - t.Fatalf("Want %v span(s), found %v", want, have) - } - - joined := finishedSpans[0] - before := finishedSpans[1] - - if joined.SpanContext.ID == before.SpanContext.ID { - t.Error("SpanID should have changed", joined.SpanContext.ID, before.SpanContext.ID) - } - - // Check that the parent/child relationship is as expected for the joined span. - if joined.SpanContext.ParentID == nil { - t.Fatalf("Want ParentID %q, have nil", before.SpanContext.ID) - } - if want, have := before.SpanContext.ID, *joined.SpanContext.ParentID; want != have { - t.Errorf("Want ParentID %q, have %q", want, have) - } - if want, have := "joined", joined.Name; want != have { - t.Errorf("Want %q, have %q", want, have) - } -} - -func TestContextToHTTPTags(t *testing.T) { - reporter := recorder.NewReporter() - defer reporter.Close() - tracer, _ := zipkin.NewTracer(reporter) - - span := tracer.StartSpan("to_inject") - ctx := zipkin.NewContext(context.Background(), span) - req, _ := http.NewRequest("GET", "http://test.biz/path", nil) - - kitzipkin.ContextToHTTP(tracer, log.NewNopLogger())(ctx, req) - - expectedTags := map[string]string{ - string(zipkin.TagHTTPMethod): "GET", - string(zipkin.TagHTTPUrl): "http://test.biz/path", - } - - span.Finish() - - finishedSpans := reporter.Flush() - if want, have := 1, len(finishedSpans); want != have { - t.Fatalf("Want %v span(s), found %v", want, have) - } - - if !reflect.DeepEqual(expectedTags, finishedSpans[0].Tags) { - t.Errorf("Want %q, have %q", expectedTags, finishedSpans[0].Tags) - } -} - -func TestHTTPToContextTags(t *testing.T) { - reporter := recorder.NewReporter() - defer reporter.Close() - tracer, _ := zipkin.NewTracer(reporter) - - parentSpan := tracer.StartSpan("to_extract") - defer parentSpan.Finish() - - req, _ := http.NewRequest("GET", "http://test.biz/path", nil) - b3.InjectHTTP(req)(parentSpan.Context()) - - ctx := kitzipkin.HTTPToContext(tracer, "op", log.NewNopLogger())(context.Background(), req) - zipkin.SpanFromContext(ctx).Finish() - - childSpan := reporter.Flush()[0] - expectedTags := map[string]string{ - string(zipkin.TagHTTPMethod): "GET", - string(zipkin.TagHTTPUrl): "http://test.biz/path", - } - if !reflect.DeepEqual(expectedTags, childSpan.Tags) { - t.Errorf("Want %q, have %q", expectedTags, childSpan.Tags) - } - if want, have := "op", childSpan.Name; want != have { - t.Errorf("Want %q, have %q", want, have) - } -} diff --git a/tracing/zipkin/options.go b/tracing/zipkin/options.go new file mode 100644 index 000000000..073a94040 --- /dev/null +++ b/tracing/zipkin/options.go @@ -0,0 +1,54 @@ +package zipkin + +import "github.com/go-kit/kit/log" + +// Option allows for functional options to our Zipkin tracing middleware. +type Option func(o *tracerOptions) + +// Name sets the name for an instrumented transport endpoint. If name is omitted +// at tracing middleware creation, the method of the transport or transport rpc +// name is used. +func Name(name string) Option { + return func(o *tracerOptions) { + o.name = name + } +} + +// Tags adds default tags to our Zipkin transport spans. +func Tags(tags map[string]string) Option { + return func(o *tracerOptions) { + for k, v := range tags { + o.tags[k] = v + } + } +} + +// Logger adds a Go kit logger to our Zipkin Middleware to log SpanContext +// extract / inject errors if they occur. Default is Noop. +func Logger(logger log.Logger) Option { + return func(o *tracerOptions) { + if logger != nil { + o.logger = logger + } + } +} + +// AllowPropagation instructs the tracer to allow or deny propagation of the +// span context between this instrumented client or service and its peers. If +// the instrumented client connects to services outside its own platform or if +// the instrumented service receives requests from untrusted clients it is +// strongly advised to disallow propagation. Propagation between services inside +// your own platform benefit from propagation. Default for both TraceClient and +// TraceServer is to allow propagation. +func AllowPropagation(propagate bool) Option { + return func(o *tracerOptions) { + o.propagate = propagate + } +} + +type tracerOptions struct { + tags map[string]string + name string + logger log.Logger + propagate bool +} diff --git a/transport/grpc/client.go b/transport/grpc/client.go index 28c203f82..5d96c6b4d 100644 --- a/transport/grpc/client.go +++ b/transport/grpc/client.go @@ -22,6 +22,7 @@ type Client struct { grpcReply reflect.Type before []ClientRequestFunc after []ClientResponseFunc + finalizer []ClientFinalizerFunc } // NewClient constructs a usable Client for a single remote endpoint. @@ -75,13 +76,29 @@ func ClientAfter(after ...ClientResponseFunc) ClientOption { return func(c *Client) { c.after = append(c.after, after...) } } +// ClientFinalizer is executed at the end of every gRPC request. +// By default, no finalizer is registered. +func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption { + return func(s *Client) { s.finalizer = append(s.finalizer, f...) } +} + // Endpoint returns a usable endpoint that will invoke the gRPC specified by the // client. func (c Client) Endpoint() endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { + return func(ctx context.Context, request interface{}) (response interface{}, err error) { ctx, cancel := context.WithCancel(ctx) defer cancel() + if c.finalizer != nil { + defer func() { + for _, f := range c.finalizer { + f(ctx, err) + } + }() + } + + ctx = context.WithValue(ctx, ContextKeyRequestMethod, c.method) + req, err := c.enc(ctx, request) if err != nil { return nil, err @@ -95,9 +112,9 @@ func (c Client) Endpoint() endpoint.Endpoint { var header, trailer metadata.MD grpcReply := reflect.New(c.grpcReply).Interface() - if err = grpc.Invoke( - ctx, c.method, req, grpcReply, c.client, - grpc.Header(&header), grpc.Trailer(&trailer), + if err = c.client.Invoke( + ctx, c.method, req, grpcReply, grpc.Header(&header), + grpc.Trailer(&trailer), ); err != nil { return nil, err } @@ -106,10 +123,18 @@ func (c Client) Endpoint() endpoint.Endpoint { ctx = f(ctx, header, trailer) } - response, err := c.dec(ctx, grpcReply) + response, err = c.dec(ctx, grpcReply) if err != nil { return nil, err } return response, nil } } + +// ClientFinalizerFunc can be used to perform work at the end of a client gRPC +// request, after the response is returned. The principal +// intended use is for error logging. Additional response parameters are +// provided in the context under keys with the ContextKeyResponse prefix. +// Note: err may be nil. There maybe also no additional response parameters depending on +// when an error occurs. +type ClientFinalizerFunc func(ctx context.Context, err error) diff --git a/transport/grpc/request_response_funcs.go b/transport/grpc/request_response_funcs.go index 8d072ede7..a797d3df3 100644 --- a/transport/grpc/request_response_funcs.go +++ b/transport/grpc/request_response_funcs.go @@ -74,3 +74,9 @@ func EncodeKeyValue(key, val string) (string, string) { } return key, val } + +type contextKey int + +const ( + ContextKeyRequestMethod contextKey = iota +) diff --git a/transport/grpc/server.go b/transport/grpc/server.go index 6da4bdb97..178c3f3b8 100644 --- a/transport/grpc/server.go +++ b/transport/grpc/server.go @@ -1,6 +1,8 @@ package grpc import ( + "context" + oldcontext "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -18,12 +20,13 @@ type Handler interface { // Server wraps an endpoint and implements grpc.Handler. type Server struct { - e endpoint.Endpoint - dec DecodeRequestFunc - enc EncodeResponseFunc - before []ServerRequestFunc - after []ServerResponseFunc - logger log.Logger + e endpoint.Endpoint + dec DecodeRequestFunc + enc EncodeResponseFunc + before []ServerRequestFunc + after []ServerResponseFunc + finalizer []ServerFinalizerFunc + logger log.Logger } // NewServer constructs a new server, which implements wraps the provided @@ -70,25 +73,45 @@ func ServerErrorLogger(logger log.Logger) ServerOption { return func(s *Server) { s.logger = logger } } +// ServerFinalizer is executed at the end of every gRPC request. +// By default, no finalizer is registered. +func ServerFinalizer(f ...ServerFinalizerFunc) ServerOption { + return func(s *Server) { s.finalizer = append(s.finalizer, f...) } +} + // ServeGRPC implements the Handler interface. -func (s Server) ServeGRPC(ctx oldcontext.Context, req interface{}) (oldcontext.Context, interface{}, error) { +func (s Server) ServeGRPC(ctx oldcontext.Context, req interface{}) (retctx oldcontext.Context, resp interface{}, err error) { // Retrieve gRPC metadata. md, ok := metadata.FromIncomingContext(ctx) if !ok { md = metadata.MD{} } + if s.finalizer != nil { + defer func() { + for _, f := range s.finalizer { + f(ctx, err) + } + }() + } + for _, f := range s.before { ctx = f(ctx, md) } - request, err := s.dec(ctx, req) + var ( + request interface{} + response interface{} + grpcResp interface{} + ) + + request, err = s.dec(ctx, req) if err != nil { s.logger.Log("err", err) return ctx, nil, err } - response, err := s.e(ctx, request) + response, err = s.e(ctx, request) if err != nil { s.logger.Log("err", err) return ctx, nil, err @@ -99,7 +122,7 @@ func (s Server) ServeGRPC(ctx oldcontext.Context, req interface{}) (oldcontext.C ctx = f(ctx, &mdHeader, &mdTrailer) } - grpcResp, err := s.enc(ctx, response) + grpcResp, err = s.enc(ctx, response) if err != nil { s.logger.Log("err", err) return ctx, nil, err @@ -121,3 +144,18 @@ func (s Server) ServeGRPC(ctx oldcontext.Context, req interface{}) (oldcontext.C return ctx, grpcResp, nil } + +// ServerFinalizerFunc can be used to perform work at the end of an gRPC +// request, after the response has been written to the client. +type ServerFinalizerFunc func(ctx context.Context, err error) + +// Interceptor is a grpc UnaryInterceptor that injects the method name into +// context so it can be consumed by Go kit gRPC middlewares. The Interceptor +// typically is added at creation time of the grpc-go server. +// Like this: `grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor))` +func Interceptor( + ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, +) (resp interface{}, err error) { + ctx = context.WithValue(ctx, ContextKeyRequestMethod, info.FullMethod) + return handler(ctx, req) +} diff --git a/transport/http/client.go b/transport/http/client.go index 25c078a58..92d3292fc 100644 --- a/transport/http/client.go +++ b/transport/http/client.go @@ -21,7 +21,7 @@ type Client struct { dec DecodeResponseFunc before []RequestFunc after []ClientResponseFunc - finalizer ClientFinalizerFunc + finalizer []ClientFinalizerFunc bufferedStream bool } @@ -73,8 +73,8 @@ func ClientAfter(after ...ClientResponseFunc) ClientOption { // ClientFinalizer is executed at the end of every HTTP request. // By default, no finalizer is registered. -func ClientFinalizer(f ClientFinalizerFunc) ClientOption { - return func(s *Client) { s.finalizer = f } +func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption { + return func(s *Client) { s.finalizer = append(s.finalizer, f...) } } // BufferedStream sets whether the Response.Body is left open, allowing it @@ -99,7 +99,9 @@ func (c Client) Endpoint() endpoint.Endpoint { ctx = context.WithValue(ctx, ContextKeyResponseHeaders, resp.Header) ctx = context.WithValue(ctx, ContextKeyResponseSize, resp.ContentLength) } - c.finalizer(ctx, err) + for _, f := range c.finalizer { + f(ctx, err) + } }() } diff --git a/transport/http/server.go b/transport/http/server.go index 183009d22..f03ec4e18 100644 --- a/transport/http/server.go +++ b/transport/http/server.go @@ -17,7 +17,7 @@ type Server struct { before []RequestFunc after []ServerResponseFunc errorEncoder ErrorEncoder - finalizer ServerFinalizerFunc + finalizer []ServerFinalizerFunc logger log.Logger } @@ -76,8 +76,8 @@ func ServerErrorLogger(logger log.Logger) ServerOption { // ServerFinalizer is executed at the end of every HTTP request. // By default, no finalizer is registered. -func ServerFinalizer(f ServerFinalizerFunc) ServerOption { - return func(s *Server) { s.finalizer = f } +func ServerFinalizer(f ...ServerFinalizerFunc) ServerOption { + return func(s *Server) { s.finalizer = append(s.finalizer, f...) } } // ServeHTTP implements http.Handler. @@ -89,7 +89,9 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer func() { ctx = context.WithValue(ctx, ContextKeyResponseHeaders, iw.Header()) ctx = context.WithValue(ctx, ContextKeyResponseSize, iw.written) - s.finalizer(ctx, iw.code, r) + for _, f := range s.finalizer { + f(ctx, iw.code, r) + } }() w = iw } From 2c589694b3bbb34b3a20e95056ec2159bf4f35e1 Mon Sep 17 00:00:00 2001 From: Bas van Beek Date: Thu, 11 Jan 2018 16:09:00 +0100 Subject: [PATCH 03/11] improvements to zipkin tracer docs and code --- examples/addsvc/pkg/addtransport/grpc.go | 25 +++++++++++------------- tracing/zipkin/grpc.go | 17 +++++++++++++--- tracing/zipkin/http.go | 4 ++-- tracing/zipkin/options.go | 12 ++++++------ transport/grpc/server.go | 2 +- transport/http/server.go | 2 +- 6 files changed, 35 insertions(+), 27 deletions(-) diff --git a/examples/addsvc/pkg/addtransport/grpc.go b/examples/addsvc/pkg/addtransport/grpc.go index 6f54219ed..5bb8f91aa 100644 --- a/examples/addsvc/pkg/addtransport/grpc.go +++ b/examples/addsvc/pkg/addtransport/grpc.go @@ -33,12 +33,15 @@ type grpcServer struct { // NewGRPCServer makes a set of endpoints available as a gRPC AddServer. func NewGRPCServer(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) pb.AddServer { - // Zipkin GRPC Server Trace can either be instantiated per endpoint with a + // Zipkin GRPC Server Trace can either be instantiated per gRPC method with a // provided operation name or a global tracing service can be instantiated - // without an operation name and fed to each Go kit endpoint as ServerOption. + // without an operation name and fed to each Go kit gRPC server as a + // ServerOption. // In the latter case, the operation name will be the endpoint's grpc method - // path. - // We demonstrate a global tracing service here. + // path if used in combination with the Go kit gRPC Interceptor. + // + // In this example, we demonstrate a global Zipkin tracing service with + // Go kit gRPC Interceptor. zipkinServer := zipkin.GRPCServerTrace(zipkinTracer) options := []grpctransport.ServerOption{ @@ -90,11 +93,13 @@ func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkin // for the entire remote instance, too. limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100)) - // Zipkin GRPC Client Trace can either be instantiated per endpoint with a + // Zipkin GRPC Client Trace can either be instantiated per gRPC method with a // provided operation name or a global tracing client can be instantiated - // without an operation name and fed to each Go kit endpoint as ClientOption. + // without an operation name and fed to each Go kit client as ClientOption. // In the latter case, the operation name will be the endpoint's grpc method // path. + // + // In this example, we demonstrace a global tracing client. zipkinClient := zipkin.GRPCClientTrace(zipkinTracer) // global client middlewares @@ -118,10 +123,6 @@ func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkin append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))..., ).Endpoint() sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint) - // For additional information TraceEndpoint is added as endpoint middleware. - // If instantiating per endpoint ClientTracers on the Go kit gRPC client, - // you might not want this additional middleware and thus could be omitted. - sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint) sumEndpoint = limiter(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Sum", @@ -143,10 +144,6 @@ func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkin append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))..., ).Endpoint() concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint) - // For additional information TraceEndpoint is added as endpoint middleware. - // If instantiating per endpoint ClientTracers on the Go kit gRPC client, - // you might not want this additional middleware and thus could be omitted. - concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint) concatEndpoint = limiter(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Concat", diff --git a/tracing/zipkin/grpc.go b/tracing/zipkin/grpc.go index e582ad0e7..5a54616d9 100644 --- a/tracing/zipkin/grpc.go +++ b/tracing/zipkin/grpc.go @@ -14,8 +14,19 @@ import ( kitgrpc "github.com/go-kit/kit/transport/grpc" ) -// GRPCClientTrace enables Zipkin tracing of a Go kit gRPC Client Transport. -func GRPCClientTrace(tracer *zipkin.Tracer, options ...Option) kitgrpc.ClientOption { +// GRPCClientTrace enables native Zipkin tracing of a Go kit gRPC Client +// Transport invocation. +// +// Go kit creates client transports per gRPC method. This middleware can be +// set-up individually per service method by adding the method name for each of +// the Go kit method clients using the Name() TracerOption. +// If wanting to use the gRPC FullMethod (/service/method) as Span name you can +// create a global client tracer omitting the Name() TracerOption, which you can +// then feed to each Go kit method client. +// If instrumenting a client to an external (not on your platform) service, you +// might want to disallow propagation of SpanContext using the AllowPropagation +// TracerOption and setting it to false. +func GRPCClientTrace(tracer *zipkin.Tracer, options ...TracerOption) kitgrpc.ClientOption { config := tracerOptions{ tags: make(map[string]string), name: "", @@ -97,7 +108,7 @@ func GRPCClientTrace(tracer *zipkin.Tracer, options ...Option) kitgrpc.ClientOpt } // GRPCServerTrace enables Zipkin tracing of a Go kit gRPC Server Transport. -func GRPCServerTrace(tracer *zipkin.Tracer, options ...Option) kitgrpc.ServerOption { +func GRPCServerTrace(tracer *zipkin.Tracer, options ...TracerOption) kitgrpc.ServerOption { config := tracerOptions{ tags: make(map[string]string), name: "", diff --git a/tracing/zipkin/http.go b/tracing/zipkin/http.go index d17f39d56..c8f2c03ef 100644 --- a/tracing/zipkin/http.go +++ b/tracing/zipkin/http.go @@ -14,7 +14,7 @@ import ( ) // HTTPClientTrace enables Zipkin tracing of a Go kit HTTP Client Transport. -func HTTPClientTrace(tracer *zipkin.Tracer, options ...Option) kithttp.ClientOption { +func HTTPClientTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.ClientOption { config := tracerOptions{ tags: make(map[string]string), name: "", @@ -106,7 +106,7 @@ func HTTPClientTrace(tracer *zipkin.Tracer, options ...Option) kithttp.ClientOpt } // HTTPServerTrace enables Zipkin tracing of a Go kit HTTP Server Transport. -func HTTPServerTrace(tracer *zipkin.Tracer, options ...Option) kithttp.ServerOption { +func HTTPServerTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.ServerOption { config := tracerOptions{ tags: make(map[string]string), name: "", diff --git a/tracing/zipkin/options.go b/tracing/zipkin/options.go index 073a94040..bdb4f97ee 100644 --- a/tracing/zipkin/options.go +++ b/tracing/zipkin/options.go @@ -2,20 +2,20 @@ package zipkin import "github.com/go-kit/kit/log" -// Option allows for functional options to our Zipkin tracing middleware. -type Option func(o *tracerOptions) +// TracerOption allows for functional options to our Zipkin tracing middleware. +type TracerOption func(o *tracerOptions) // Name sets the name for an instrumented transport endpoint. If name is omitted // at tracing middleware creation, the method of the transport or transport rpc // name is used. -func Name(name string) Option { +func Name(name string) TracerOption { return func(o *tracerOptions) { o.name = name } } // Tags adds default tags to our Zipkin transport spans. -func Tags(tags map[string]string) Option { +func Tags(tags map[string]string) TracerOption { return func(o *tracerOptions) { for k, v := range tags { o.tags[k] = v @@ -25,7 +25,7 @@ func Tags(tags map[string]string) Option { // Logger adds a Go kit logger to our Zipkin Middleware to log SpanContext // extract / inject errors if they occur. Default is Noop. -func Logger(logger log.Logger) Option { +func Logger(logger log.Logger) TracerOption { return func(o *tracerOptions) { if logger != nil { o.logger = logger @@ -40,7 +40,7 @@ func Logger(logger log.Logger) Option { // strongly advised to disallow propagation. Propagation between services inside // your own platform benefit from propagation. Default for both TraceClient and // TraceServer is to allow propagation. -func AllowPropagation(propagate bool) Option { +func AllowPropagation(propagate bool) TracerOption { return func(o *tracerOptions) { o.propagate = propagate } diff --git a/transport/grpc/server.go b/transport/grpc/server.go index 178c3f3b8..ccf4f0947 100644 --- a/transport/grpc/server.go +++ b/transport/grpc/server.go @@ -87,7 +87,7 @@ func (s Server) ServeGRPC(ctx oldcontext.Context, req interface{}) (retctx oldco md = metadata.MD{} } - if s.finalizer != nil { + if len(s.finalizer) > 0 { defer func() { for _, f := range s.finalizer { f(ctx, err) diff --git a/transport/http/server.go b/transport/http/server.go index f03ec4e18..e01bf870b 100644 --- a/transport/http/server.go +++ b/transport/http/server.go @@ -84,7 +84,7 @@ func ServerFinalizer(f ...ServerFinalizerFunc) ServerOption { func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - if s.finalizer != nil { + if len(s.finalizer) > 0 { iw := &interceptingWriter{w, http.StatusOK, 0} defer func() { ctx = context.WithValue(ctx, ContextKeyResponseHeaders, iw.Header()) From 891c07c532503cd0a19f4fd46bcda87704382cd6 Mon Sep 17 00:00:00 2001 From: Bas van Beek Date: Wed, 17 Jan 2018 14:46:18 +0100 Subject: [PATCH 04/11] source comments changes --- examples/addsvc/cmd/addcli/addcli.go | 16 ++++++++------- examples/addsvc/cmd/addsvc/addsvc.go | 13 ++++++------ tracing/zipkin/grpc.go | 30 +++++++++++++++++++--------- tracing/zipkin/http.go | 27 +++++++++++++++++++++++-- 4 files changed, 62 insertions(+), 24 deletions(-) diff --git a/examples/addsvc/cmd/addcli/addcli.go b/examples/addsvc/cmd/addcli/addcli.go index 4070cc703..fdc5af071 100644 --- a/examples/addsvc/cmd/addcli/addcli.go +++ b/examples/addsvc/cmd/addcli/addcli.go @@ -90,22 +90,24 @@ func main() { } } + // This is a demonstration of the native Zipkin tracing client. If using + // Zipkin this is the more idiomatic client over OpenTracing. var zipkinTracer *zipkin.Tracer { var ( - err error - hostPort = "" // if host:port is unknown we can keep this empty - serviceName = "addsvc-cli" + err error + hostPort = "" // if host:port is unknown we can keep this empty + serviceName = "addsvc-cli" + useNoopTracer = (*zipkinV2URL == "") + reporter = zipkinhttp.NewReporter(*zipkinV2URL) ) - noopTracer := (*zipkinV2URL == "") - reporter := zipkinhttp.NewReporter(*zipkinV2URL) defer reporter.Close() zEP, _ := zipkin.NewEndpoint(serviceName, hostPort) zipkinTracer, err = zipkin.NewTracer( - reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(noopTracer), + reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(useNoopTracer), ) if err != nil { - fmt.Fprintln(os.Stderr, err.Error()) + fmt.Fprintf(os.Stderr, "unable to create zipkin tracer: %s\n", err.Error()) os.Exit(1) } } diff --git a/examples/addsvc/cmd/addsvc/addsvc.go b/examples/addsvc/cmd/addsvc/addsvc.go index c34c45062..f11fae6c1 100644 --- a/examples/addsvc/cmd/addsvc/addsvc.go +++ b/examples/addsvc/cmd/addsvc/addsvc.go @@ -106,15 +106,16 @@ func main() { var zipkinTracer *zipkin.Tracer { var ( - err error - hostPort = "localhost:80" - serviceName = "addsvc" + err error + hostPort = "localhost:80" + serviceName = "addsvc" + useNoopTracer = (*zipkinV2URL == "") + reporter = zipkinhttp.NewReporter(*zipkinV2URL) ) - noopTracer := (*zipkinV2URL == "") + defer reporter.Close() zEP, _ := zipkin.NewEndpoint(serviceName, hostPort) - reporter := zipkinhttp.NewReporter(*zipkinV2URL) zipkinTracer, err = zipkin.NewTracer( - reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(noopTracer), + reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(useNoopTracer), ) if err != nil { logger.Log("err", err) diff --git a/tracing/zipkin/grpc.go b/tracing/zipkin/grpc.go index 5a54616d9..adfa1745b 100644 --- a/tracing/zipkin/grpc.go +++ b/tracing/zipkin/grpc.go @@ -14,18 +14,18 @@ import ( kitgrpc "github.com/go-kit/kit/transport/grpc" ) -// GRPCClientTrace enables native Zipkin tracing of a Go kit gRPC Client -// Transport invocation. +// GRPCClientTrace enables native Zipkin tracing of a Go kit gRPC transport +// Client. // -// Go kit creates client transports per gRPC method. This middleware can be -// set-up individually per service method by adding the method name for each of -// the Go kit method clients using the Name() TracerOption. +// Go kit creates gRPC transport clients per remote endpoint. This middleware +// can be set-up individually by adding the endpoint name for each of the Go kit +// transport clients using the Name() TracerOption. // If wanting to use the gRPC FullMethod (/service/method) as Span name you can // create a global client tracer omitting the Name() TracerOption, which you can -// then feed to each Go kit method client. +// then feed to each Go kit gRPC transport client. // If instrumenting a client to an external (not on your platform) service, you -// might want to disallow propagation of SpanContext using the AllowPropagation -// TracerOption and setting it to false. +// will probably want to disallow propagation of SpanContext using the +// AllowPropagation TracerOption and setting it to false. func GRPCClientTrace(tracer *zipkin.Tracer, options ...TracerOption) kitgrpc.ClientOption { config := tracerOptions{ tags: make(map[string]string), @@ -107,7 +107,19 @@ func GRPCClientTrace(tracer *zipkin.Tracer, options ...TracerOption) kitgrpc.Cli } -// GRPCServerTrace enables Zipkin tracing of a Go kit gRPC Server Transport. +// GRPCServerTrace enables native Zipkin tracing of a Go kit gRPC transport +// Server. +// +// Go kit creates gRPC transport servers per gRPC method. This middleware can be +// set-up individually by adding the method name for each of the Go kit method +// servers using the Name() TracerOption. +// If wanting to use the gRPC FullMethod (/service/method) as Span name you can +// create a global server tracer omitting the Name() TracerOption, which you can +// then feed to each Go kit method server. For this to work you will need to +// wire the Go kit gRPC Interceptor too. +// If instrumenting a service to external (not on your platform) clients, you +// will probably want to disallow propagation of a client SpanContext using +// the AllowPropagation TracerOption and setting it to false. func GRPCServerTrace(tracer *zipkin.Tracer, options ...TracerOption) kitgrpc.ServerOption { config := tracerOptions{ tags: make(map[string]string), diff --git a/tracing/zipkin/http.go b/tracing/zipkin/http.go index c8f2c03ef..aa5ff32ae 100644 --- a/tracing/zipkin/http.go +++ b/tracing/zipkin/http.go @@ -13,7 +13,18 @@ import ( kithttp "github.com/go-kit/kit/transport/http" ) -// HTTPClientTrace enables Zipkin tracing of a Go kit HTTP Client Transport. +// HTTPClientTrace enables native Zipkin tracing of a Go kit HTTP transport +// Client. +// +// Go kit creates HTTP transport clients per remote endpoint. This middleware +// can be set-up individually by adding the endpoint name for each of the Go kit +// transport clients using the Name() TracerOption. +// If wanting to use the HTTP Method (Get, Post, Put, etc.) as Span name you can +// create a global client tracer omitting the Name() TracerOption, which you can +// then feed to each Go kit transport client. +// If instrumenting a client to an external (not on your platform) service, you +// will probably want to disallow propagation of SpanContext using the +// AllowPropagation TracerOption and setting it to false. func HTTPClientTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.ClientOption { config := tracerOptions{ tags: make(map[string]string), @@ -105,7 +116,19 @@ func HTTPClientTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.Cli } } -// HTTPServerTrace enables Zipkin tracing of a Go kit HTTP Server Transport. +// HTTPServerTrace enables native Zipkin tracing of a Go kit HTTP transport +// Server. +// +// Go kit creates HTTP transport servers per HTTP endpoint. This middleware can +// be set-up individually by adding the method name for each of the Go kit +// method servers using the Name() TracerOption. +// If wanting to use the HTTP method (Get, Post, Put, etc.) as Span name you can +// create a global server tracer omitting the Name() TracerOption, which you can +// then feed to each Go kit method server. +// +// If instrumenting a service to external (not on your platform) clients, you +// will probably want to disallow propagation of a client SpanContext using +// the AllowPropagation TracerOption and setting it to false. func HTTPServerTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.ServerOption { config := tracerOptions{ tags: make(map[string]string), From b6d690e5a34d2c13ed69d225cd0821a2e34933f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Ch=C3=A1vez?= Date: Sat, 17 Feb 2018 09:57:23 -0500 Subject: [PATCH 05/11] Adds http test. --- tracing/zipkin/http_test.go | 160 ++++++++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 tracing/zipkin/http_test.go diff --git a/tracing/zipkin/http_test.go b/tracing/zipkin/http_test.go new file mode 100644 index 000000000..3f01cc2d5 --- /dev/null +++ b/tracing/zipkin/http_test.go @@ -0,0 +1,160 @@ +package zipkin_test + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "testing" + + zipkinkit "github.com/go-kit/kit/tracing/zipkin" + kithttp "github.com/go-kit/kit/transport/http" + zipkin "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/model" + "github.com/openzipkin/zipkin-go/reporter/recorder" +) + +const ( + testName = "test" + testBody = "test_body" + testTagKey = "test_key" + testTagValue = "test_value" +) + +func TestHttpClientTracePropagatesParentSpan(t *testing.T) { + rec := recorder.NewReporter() + defer rec.Close() + + tr, _ := zipkin.NewTracer(rec) + + rURL, _ := url.Parse("http://test.com") + + c := kithttp.NewClient( + "GET", + rURL, + func(ctx context.Context, r *http.Request, i interface{}) error { + return nil + }, + func(ctx context.Context, r *http.Response) (response interface{}, err error) { + return nil, nil + }, + ) + + clientOption := zipkinkit.HTTPClientTrace(tr) + clientOption(c) + + parentSpan := tr.StartSpan("test") + + ctx := zipkin.NewContext(context.Background(), parentSpan) + + _, err := c.Endpoint()(ctx, nil) + if err != nil { + t.Fatalf("unwanted error: %s", err.Error()) + } + + spans := rec.Flush() + if want, have := 1, len(spans); want != have { + t.Fatalf("incorrect number of spans, wanted %d, got %d", want, have) + } + + span := spans[0] + if span.SpanContext.ParentID == nil { + t.Fatalf("incorrect parent ID, got nil") + } + + if want, have := parentSpan.Context().ID, *span.SpanContext.ParentID; want != have { + t.Fatalf("incorrect parent ID, wanted %s, got %s", want, have) + } +} + +func TestHTTPClientTraceAddsExpectedTags(t *testing.T) { + dataProvider := []struct { + ResponseStatusCode int + ErrorTagValue string + }{ + {http.StatusOK, ""}, + {http.StatusForbidden, fmt.Sprint(http.StatusForbidden)}, + } + + for _, data := range dataProvider { + testHTTPClientTraceCase(t, data.ResponseStatusCode, data.ErrorTagValue) + } +} + +func testHTTPClientTraceCase(t *testing.T, responseStatusCode int, errTagValue string) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(responseStatusCode) + w.Write([]byte(testBody)) + })) + defer ts.Close() + + rec := recorder.NewReporter() + defer rec.Close() + + tr, err := zipkin.NewTracer(rec) + if err != nil { + t.Errorf("Unwanted error: %s", err.Error()) + } + + rMethod := "GET" + rURL, _ := url.Parse(ts.URL) + + c := kithttp.NewClient( + rMethod, + rURL, + func(ctx context.Context, r *http.Request, i interface{}) error { + return nil + }, + func(ctx context.Context, r *http.Response) (response interface{}, err error) { + return nil, nil + }, + ) + + clientOption := zipkinkit.HTTPClientTrace( + tr, + zipkinkit.Name(testName), + zipkinkit.Tags(map[string]string{testTagKey: testTagValue}), + ) + clientOption(c) + + _, err = c.Endpoint()(context.Background(), nil) + if err != nil { + t.Fatalf("unwanted error: %s", err.Error()) + } + + spans := rec.Flush() + if want, have := 1, len(spans); want != have { + t.Fatalf("incorrect number of spans, wanted %d, got %d", want, have) + } + + span := spans[0] + if span.SpanContext.ParentID != nil { + t.Fatalf("incorrect parentID, wanted nil, got %s", span.SpanContext.ParentID) + } + + if want, have := testName, span.Name; want != have { + t.Fatalf("incorrect span name, wanted %s, got %s", want, have) + } + + if want, have := model.Client, span.Kind; want != have { + t.Fatalf("incorrect span kind, wanted %s, got %s", want, have) + } + + tags := map[string]string{ + testTagKey: testTagValue, + string(zipkin.TagHTTPStatusCode): fmt.Sprint(responseStatusCode), + string(zipkin.TagHTTPMethod): rMethod, + string(zipkin.TagHTTPUrl): rURL.String(), + string(zipkin.TagHTTPResponseSize): fmt.Sprint(len(testBody)), + } + + if errTagValue != "" { + tags[string(zipkin.TagError)] = fmt.Sprint(errTagValue) + } + + if !reflect.DeepEqual(span.Tags, tags) { + t.Fatalf("invalid tags set, wanted %+v, got %+v", tags, span.Tags) + } +} From 97a5330477c2c01b4ecd57d62a92660166b0aca2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Ch=C3=A1vez?= Date: Sat, 17 Feb 2018 10:25:28 -0500 Subject: [PATCH 06/11] Adds endpoint test. --- tracing/zipkin/endpoint_test.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 tracing/zipkin/endpoint_test.go diff --git a/tracing/zipkin/endpoint_test.go b/tracing/zipkin/endpoint_test.go new file mode 100644 index 000000000..c0102e8ff --- /dev/null +++ b/tracing/zipkin/endpoint_test.go @@ -0,0 +1,30 @@ +package zipkin_test + +import ( + "context" + "testing" + + "github.com/go-kit/kit/endpoint" + zipkinkit "github.com/go-kit/kit/tracing/zipkin" + zipkin "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/reporter/recorder" +) + +const spanName = "test" + +func TestTraceEndpoint(t *testing.T) { + rec := recorder.NewReporter() + tr, _ := zipkin.NewTracer(rec) + mw := zipkinkit.TraceEndpoint(tr, spanName) + mw(endpoint.Nop)(context.Background(), nil) + + spans := rec.Flush() + + if want, have := 1, len(spans); want != have { + t.Fatalf("incorrect number of spans, wanted %d, got %d", want, have) + } + + if want, have := spanName, spans[0].Name; want != have { + t.Fatalf("incorrect span name, wanted %s, got %s", want, have) + } +} From 3d35e45a03abcd10d3d9cd6987a955ff052d2060 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Ch=C3=A1vez?= Date: Sat, 17 Feb 2018 12:57:44 -0500 Subject: [PATCH 07/11] Improves code based on idiomatic feedback. --- tracing/zipkin/endpoint_test.go | 5 +++-- tracing/zipkin/http_test.go | 38 ++++++++++++++++----------------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/tracing/zipkin/endpoint_test.go b/tracing/zipkin/endpoint_test.go index c0102e8ff..7ed8aac8a 100644 --- a/tracing/zipkin/endpoint_test.go +++ b/tracing/zipkin/endpoint_test.go @@ -4,10 +4,11 @@ import ( "context" "testing" + "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/reporter/recorder" + "github.com/go-kit/kit/endpoint" zipkinkit "github.com/go-kit/kit/tracing/zipkin" - zipkin "github.com/openzipkin/zipkin-go" - "github.com/openzipkin/zipkin-go/reporter/recorder" ) const spanName = "test" diff --git a/tracing/zipkin/http_test.go b/tracing/zipkin/http_test.go index 3f01cc2d5..15c4d6444 100644 --- a/tracing/zipkin/http_test.go +++ b/tracing/zipkin/http_test.go @@ -9,11 +9,12 @@ import ( "reflect" "testing" - zipkinkit "github.com/go-kit/kit/tracing/zipkin" - kithttp "github.com/go-kit/kit/transport/http" - zipkin "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go" "github.com/openzipkin/zipkin-go/model" "github.com/openzipkin/zipkin-go/reporter/recorder" + + zipkinkit "github.com/go-kit/kit/tracing/zipkin" + kithttp "github.com/go-kit/kit/transport/http" ) const ( @@ -31,7 +32,8 @@ func TestHttpClientTracePropagatesParentSpan(t *testing.T) { rURL, _ := url.Parse("http://test.com") - c := kithttp.NewClient( + clientTracer := zipkinkit.HTTPClientTrace(tr) + ep := kithttp.NewClient( "GET", rURL, func(ctx context.Context, r *http.Request, i interface{}) error { @@ -40,16 +42,14 @@ func TestHttpClientTracePropagatesParentSpan(t *testing.T) { func(ctx context.Context, r *http.Response) (response interface{}, err error) { return nil, nil }, - ) - - clientOption := zipkinkit.HTTPClientTrace(tr) - clientOption(c) + clientTracer, + ).Endpoint() parentSpan := tr.StartSpan("test") ctx := zipkin.NewContext(context.Background(), parentSpan) - _, err := c.Endpoint()(ctx, nil) + _, err := ep(ctx, nil) if err != nil { t.Fatalf("unwanted error: %s", err.Error()) } @@ -101,7 +101,13 @@ func testHTTPClientTraceCase(t *testing.T, responseStatusCode int, errTagValue s rMethod := "GET" rURL, _ := url.Parse(ts.URL) - c := kithttp.NewClient( + clientTracer := zipkinkit.HTTPClientTrace( + tr, + zipkinkit.Name(testName), + zipkinkit.Tags(map[string]string{testTagKey: testTagValue}), + ) + + ep := kithttp.NewClient( rMethod, rURL, func(ctx context.Context, r *http.Request, i interface{}) error { @@ -110,16 +116,10 @@ func testHTTPClientTraceCase(t *testing.T, responseStatusCode int, errTagValue s func(ctx context.Context, r *http.Response) (response interface{}, err error) { return nil, nil }, - ) - - clientOption := zipkinkit.HTTPClientTrace( - tr, - zipkinkit.Name(testName), - zipkinkit.Tags(map[string]string{testTagKey: testTagValue}), - ) - clientOption(c) + clientTracer, + ).Endpoint() - _, err = c.Endpoint()(context.Background(), nil) + _, err = ep(context.Background(), nil) if err != nil { t.Fatalf("unwanted error: %s", err.Error()) } From f98a50bb63867586d679b8c5a2e3fe62d65cb72c Mon Sep 17 00:00:00 2001 From: Bas van Beek Date: Tue, 6 Mar 2018 12:02:06 +0100 Subject: [PATCH 08/11] updates to tracing doc comments --- tracing/README.md | 19 ++++++++++--------- tracing/doc.go | 3 +-- tracing/zipkin/README.md | 24 +++++++++--------------- 3 files changed, 20 insertions(+), 26 deletions(-) diff --git a/tracing/README.md b/tracing/README.md index 87242e4cc..0eb311b4f 100644 --- a/tracing/README.md +++ b/tracing/README.md @@ -12,13 +12,13 @@ it. ## Zipkin -[Zipkin] is the most used OSS distributed tracing platform available with -support for many different languages and frameworks. Go kit provides bindings -to the native Go tracing implementation [zipkin-go]. If using Zipkin with Go -kit in a polyglot microservices environment, this is the preferred binding to -use. Instrumentation exists for `kit/transport/http` and `kit/transport/grpc`. -The bindings are highlighted in the [addsvc] example. For more information -regarding Zipkin feel free to visit [Zipkin's Gitter]. +[Zipkin] is one of the most used OSS distributed tracing platforms available +with support for many different languages and frameworks. Go kit provides +bindings to the native Go tracing implementation [zipkin-go]. If using Zipkin +with Go kit in a polyglot microservices environment, this is the preferred +binding to use. Instrumentation exists for `kit/transport/http` and +`kit/transport/grpc`. The bindings are highlighted in the [addsvc] example. For +more information regarding Zipkin feel free to visit [Zipkin's Gitter]. ## OpenTracing @@ -32,8 +32,9 @@ bridge in Go for your back-end exists, it should work out of the box. Please note that the "world view" of existing tracing systems do differ. OpenTracing can not guarantee you that tracing alignment is perfect in a -polyglot microservice environment or switching from one tracing backend to -another truly entails just a change in configuration. +microservice environment especially one which is not exclusively OpenTracing +enabled or switching from one tracing backend to another truly entails just a +change in configuration. The following tracing back-ends are known to work with Go kit through the OpenTracing interface and are highlighted in the [addsvc] example. diff --git a/tracing/doc.go b/tracing/doc.go index 8e5ec4695..2843a858a 100644 --- a/tracing/doc.go +++ b/tracing/doc.go @@ -3,6 +3,5 @@ // As your infrastructure grows, it becomes important to be able to trace a // request, as it travels through multiple services and back to the user. // Package tracing provides endpoints and transport helpers and middlewares to -// capture and emit request-scoped information. We use the excellent OpenTracing -// project to bind to concrete tracing systems. +// capture and emit request-scoped information. package tracing diff --git a/tracing/zipkin/README.md b/tracing/zipkin/README.md index 25549acd8..a37771534 100644 --- a/tracing/zipkin/README.md +++ b/tracing/zipkin/README.md @@ -5,7 +5,7 @@ Great efforts have been made to make [Zipkin] easier to test, develop and experiment against. [Zipkin] can now be run from a single Docker container or by running its self-contained executable jar without extensive configuration. In -its default configuration you will run Zipkin with a HTTP collector, In memory +its default configuration you will run [Zipkin] with a HTTP collector, In memory Span storage backend and web UI on port 9411. Example: @@ -15,23 +15,18 @@ docker run -d -p 9411:9411 openzipkin/zipkin [zipkin]: http://zipkin.io -Instrumenting your services with Zipkin distributed tracing using the default -configuration is now possible with the latest release of [zipkin-go-opentracing] -as it includes an HTTP transport for sending spans to the [Zipkin] HTTP -Collector. - ## Middleware Usage -Follow the [addsvc] example to check out how to wire the Zipkin Middleware. The -changes should be relatively minor. +Follow the [addsvc] example to check out how to wire the [Zipkin] Middleware. +The changes should be relatively minor. -The [zipkin-go] package has Reporters to send Spans to the Zipkin -HTTP and Kafka Collectors. +The [zipkin-go] package has Reporters to send Spans to the [Zipkin] HTTP and +Kafka Collectors. ### Configuring the Zipkin HTTP Reporter -To use the HTTP Reporter with a Zipkin instance running on localhost you -bootstrap zipkin-go like this: +To use the HTTP Reporter with a [Zipkin] instance running on localhost you +bootstrap [zipkin-go] like this: ```go var ( @@ -44,7 +39,7 @@ var ( reporter := zipkin.NewReporter(zipkinHTTPEndpoint) // create our tracer's local endpoint (how the service is identified in Zipkin). -localEndpoint, _ := zipkin.NewEndpoint(serviceName, serviceHostPort) +localEndpoint, err := zipkin.NewEndpoint(serviceName, serviceHostPort) // create our tracer instance. tracer, err = zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(localEndpoint)) @@ -74,8 +69,7 @@ func (svc *Service) GetMeSomeExamples(ctx context.Context, ...) ([]Examples, err query = "select * from example where param = :value" ) - // retrieve the parent span from context to use as parent, if not found we - // start a new trace + // retrieve the parent span from context to use as parent if available. if parentSpan := zipkin.SpanFromContext(ctx); parentSpan != nil { spanContext = parentSpan.Context() } From 298efeb2e74111e1c47a78a558106343936a7cf8 Mon Sep 17 00:00:00 2001 From: Bas van Beek Date: Tue, 6 Mar 2018 12:39:13 +0100 Subject: [PATCH 09/11] do not allow to activate both zipkin-go and zipkin-go-opentracing to avoid confusion --- examples/addsvc/cmd/addcli/addcli.go | 2 +- examples/addsvc/cmd/addsvc/addsvc.go | 13 ++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/examples/addsvc/cmd/addcli/addcli.go b/examples/addsvc/cmd/addcli/addcli.go index fdc5af071..cedc22216 100644 --- a/examples/addsvc/cmd/addcli/addcli.go +++ b/examples/addsvc/cmd/addcli/addcli.go @@ -60,7 +60,7 @@ func main() { // Your clients will probably just use one tracer. var otTracer stdopentracing.Tracer { - if *zipkinV1URL != "" { + if *zipkinV1URL != "" && *zipkinV2URL == "" { collector, err := zipkinot.NewHTTPCollector(*zipkinV1URL) if err != nil { fmt.Fprintln(os.Stderr, err.Error()) diff --git a/examples/addsvc/cmd/addsvc/addsvc.go b/examples/addsvc/cmd/addsvc/addsvc.go index f11fae6c1..6bed9c8f6 100644 --- a/examples/addsvc/cmd/addsvc/addsvc.go +++ b/examples/addsvc/cmd/addsvc/addsvc.go @@ -69,8 +69,8 @@ func main() { // components that use it, as a dependency. var tracer stdopentracing.Tracer { - if *zipkinV1URL != "" { - logger.Log("tracer", "Zipkin", "URL", *zipkinV1URL) + if *zipkinV1URL != "" && *zipkinV2URL == "" { + logger.Log("tracer", "Zipkin", "type", "OpenTracing", "URL", *zipkinV1URL) collector, err := zipkinot.NewHTTPCollector(*zipkinV1URL) if err != nil { logger.Log("err", err) @@ -98,7 +98,9 @@ func main() { logger.Log("tracer", "Appdash", "addr", *appdashAddr) tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) } else { - logger.Log("tracer", "none") + if *zipkinV2URL == "" { + logger.Log("tracer", "none") + } tracer = stdopentracing.GlobalTracer() // no-op } } @@ -121,6 +123,11 @@ func main() { logger.Log("err", err) os.Exit(1) } + if useNoopTracer { + logger.Log("tracer", "none") + } else { + logger.Log("tracer", "Zipkin", "type", "Native", "URL", *zipkinV2URL) + } } // Create the (sparse) metrics we'll use in the service. They, too, are From cdd9bc5af77ed3efb03110f82de573a2f1d7eaa6 Mon Sep 17 00:00:00 2001 From: Bas van Beek Date: Tue, 6 Mar 2018 14:04:30 +0100 Subject: [PATCH 10/11] add test for zipkin http server tracer --- tracing/zipkin/http_test.go | 65 ++++++++++++++++++++++++++++++++++--- 1 file changed, 61 insertions(+), 4 deletions(-) diff --git a/tracing/zipkin/http_test.go b/tracing/zipkin/http_test.go index 15c4d6444..17a10d40c 100644 --- a/tracing/zipkin/http_test.go +++ b/tracing/zipkin/http_test.go @@ -11,8 +11,10 @@ import ( "github.com/openzipkin/zipkin-go" "github.com/openzipkin/zipkin-go/model" + "github.com/openzipkin/zipkin-go/propagation/b3" "github.com/openzipkin/zipkin-go/reporter/recorder" + "github.com/go-kit/kit/endpoint" zipkinkit "github.com/go-kit/kit/tracing/zipkin" kithttp "github.com/go-kit/kit/transport/http" ) @@ -51,21 +53,21 @@ func TestHttpClientTracePropagatesParentSpan(t *testing.T) { _, err := ep(ctx, nil) if err != nil { - t.Fatalf("unwanted error: %s", err.Error()) + t.Fatalf("unexpected error: %s", err.Error()) } spans := rec.Flush() if want, have := 1, len(spans); want != have { - t.Fatalf("incorrect number of spans, wanted %d, got %d", want, have) + t.Fatalf("incorrect number of spans, want %d, have %d", want, have) } span := spans[0] if span.SpanContext.ParentID == nil { - t.Fatalf("incorrect parent ID, got nil") + t.Fatalf("incorrect parent ID, want %s have nil", parentSpan.Context().ID) } if want, have := parentSpan.Context().ID, *span.SpanContext.ParentID; want != have { - t.Fatalf("incorrect parent ID, wanted %s, got %s", want, have) + t.Fatalf("incorrect parent ID, want %s, have %s", want, have) } } @@ -158,3 +160,58 @@ func testHTTPClientTraceCase(t *testing.T, responseStatusCode int, errTagValue s t.Fatalf("invalid tags set, wanted %+v, got %+v", tags, span.Tags) } } + +func TestHTTPServerTrace(t *testing.T) { + rec := recorder.NewReporter() + defer rec.Close() + + // explicitely show we use the default of RPC shared spans in Zipkin as it + // is idiomatic for Zipkin to share span identifiers between client and + // server side. + tr, _ := zipkin.NewTracer(rec, zipkin.WithSharedSpans(true)) + + handler := kithttp.NewServer( + endpoint.Nop, + func(context.Context, *http.Request) (interface{}, error) { return nil, nil }, + func(context.Context, http.ResponseWriter, interface{}) error { return nil }, + zipkinkit.HTTPServerTrace(tr), + ) + + server := httptest.NewServer(handler) + defer server.Close() + + const httpMethod = "GET" + + req, err := http.NewRequest(httpMethod, server.URL, nil) + if err != nil { + t.Fatalf("unable to create HTTP request: %s", err.Error()) + } + + parentSpan := tr.StartSpan("Dummy") + + b3.InjectHTTP(req)(parentSpan.Context()) + + client := http.Client{} + resp, err := client.Do(req) + if err != nil { + t.Fatalf("unable to send HTTP request: %s", err.Error()) + } + resp.Body.Close() + + spans := rec.Flush() + if want, have := 1, len(spans); want != have { + t.Fatalf("incorrect number of spans, want %d, have %d", want, have) + } + + if want, have := parentSpan.Context().TraceID, spans[0].SpanContext.TraceID; want != have { + t.Errorf("incorrect TraceID, want %+v, have %+v", want, have) + } + + if want, have := parentSpan.Context().ID, spans[0].SpanContext.ID; want != have { + t.Errorf("incorrect span ID, want %d, have %d", want, have) + } + + if want, have := httpMethod, spans[0].Name; want != have { + t.Errorf("incorrect span name, want %s, have %s", want, have) + } +} From c147546fbddf87326867c0b94eb321c95c0c398e Mon Sep 17 00:00:00 2001 From: Bas van Beek Date: Tue, 6 Mar 2018 16:24:33 +0100 Subject: [PATCH 11/11] add test for zipkin grpc server and client tracers --- examples/addsvc/cmd/addsvc/addsvc.go | 7 +- tracing/zipkin/grpc_test.go | 111 +++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 6 deletions(-) create mode 100644 tracing/zipkin/grpc_test.go diff --git a/examples/addsvc/cmd/addsvc/addsvc.go b/examples/addsvc/cmd/addsvc/addsvc.go index 6bed9c8f6..dbd697129 100644 --- a/examples/addsvc/cmd/addsvc/addsvc.go +++ b/examples/addsvc/cmd/addsvc/addsvc.go @@ -98,9 +98,6 @@ func main() { logger.Log("tracer", "Appdash", "addr", *appdashAddr) tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) } else { - if *zipkinV2URL == "" { - logger.Log("tracer", "none") - } tracer = stdopentracing.GlobalTracer() // no-op } } @@ -123,9 +120,7 @@ func main() { logger.Log("err", err) os.Exit(1) } - if useNoopTracer { - logger.Log("tracer", "none") - } else { + if !useNoopTracer { logger.Log("tracer", "Zipkin", "type", "Native", "URL", *zipkinV2URL) } } diff --git a/tracing/zipkin/grpc_test.go b/tracing/zipkin/grpc_test.go new file mode 100644 index 000000000..27965ed7b --- /dev/null +++ b/tracing/zipkin/grpc_test.go @@ -0,0 +1,111 @@ +package zipkin_test + +import ( + "context" + "testing" + + zipkin "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/propagation/b3" + "github.com/openzipkin/zipkin-go/reporter/recorder" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + + "github.com/go-kit/kit/endpoint" + kitzipkin "github.com/go-kit/kit/tracing/zipkin" + grpctransport "github.com/go-kit/kit/transport/grpc" +) + +type dummy struct{} + +func unaryInterceptor( + ctx context.Context, method string, req, reply interface{}, + cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption, +) error { + return nil +} + +func TestGRPCClientTrace(t *testing.T) { + rec := recorder.NewReporter() + defer rec.Close() + + tr, _ := zipkin.NewTracer(rec) + + clientTracer := kitzipkin.GRPCClientTrace(tr) + + cc, err := grpc.Dial( + "", + grpc.WithUnaryInterceptor(unaryInterceptor), + grpc.WithInsecure(), + ) + if err != nil { + t.Fatalf("unable to create gRPC dialer: %s", err.Error()) + } + + ep := grpctransport.NewClient( + cc, + "dummyService", + "dummyMethod", + func(context.Context, interface{}) (interface{}, error) { return nil, nil }, + func(context.Context, interface{}) (interface{}, error) { return nil, nil }, + dummy{}, + clientTracer, + ).Endpoint() + + parentSpan := tr.StartSpan("test") + ctx := zipkin.NewContext(context.Background(), parentSpan) + + if _, err = ep(ctx, nil); err != nil { + t.Errorf("unexpected error: %s", err.Error()) + } + + spans := rec.Flush() + if want, have := 1, len(spans); want != have { + t.Fatalf("incorrect number of spans, want %d, have %d", want, have) + } + + if spans[0].SpanContext.ParentID == nil { + t.Fatalf("incorrect parent ID, want %s have nil", parentSpan.Context().ID) + } + + if want, have := parentSpan.Context().ID, *spans[0].SpanContext.ParentID; want != have { + t.Fatalf("incorrect parent ID, want %s, have %s", want, have) + } +} + +func TestGRPCServerTrace(t *testing.T) { + rec := recorder.NewReporter() + defer rec.Close() + + tr, _ := zipkin.NewTracer(rec) + + serverTracer := kitzipkin.GRPCServerTrace(tr) + + server := grpctransport.NewServer( + endpoint.Nop, + func(context.Context, interface{}) (interface{}, error) { return nil, nil }, + func(context.Context, interface{}) (interface{}, error) { return nil, nil }, + serverTracer, + ) + + md := metadata.MD{} + parentSpan := tr.StartSpan("test") + + b3.InjectGRPC(&md)(parentSpan.Context()) + + ctx := metadata.NewIncomingContext(context.Background(), md) + server.ServeGRPC(ctx, nil) + + spans := rec.Flush() + + if want, have := 1, len(spans); want != have { + t.Fatalf("incorrect number of spans, want %d, have %d", want, have) + } + + if want, have := parentSpan.Context().TraceID, spans[0].SpanContext.TraceID; want != have { + t.Errorf("incorrect TraceID, want %+v, have %+v", want, have) + } + + if want, have := parentSpan.Context().ID, spans[0].SpanContext.ID; want != have { + t.Errorf("incorrect span ID, want %d, have %d", want, have) + } +}