diff --git a/Gopkg.lock b/Gopkg.lock index a27f972746f..d12c96d8122 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -918,7 +918,7 @@ [[projects]] branch = "master" - digest = "1:51733e0105994029b3c8386da549def50d733d13938d93fbf870f03ff8c8acad" + digest = "1:7e52f72864ff1d884dbee16153715d71510a3be1b715b313dcf416d5bc9bbe61" name = "github.com/weaveworks/common" packages = [ "aws", @@ -938,7 +938,7 @@ "user", ] pruneopts = "UT" - revision = "1a7a6b3820595d7682b7284682b733733005c57e" + revision = "6a85bf520acfc70202792d295429ffcf10de9e53" [[projects]] digest = "1:efac30de93ca1ff38050f46dc34f1338ebc8778de488f919f79ad9e6188719d3" diff --git a/pkg/querier/frontend/frontend.go b/pkg/querier/frontend/frontend.go index c7332e9d83c..5ed5180477a 100644 --- a/pkg/querier/frontend/frontend.go +++ b/pkg/querier/frontend/frontend.go @@ -1,8 +1,11 @@ package frontend import ( + "bytes" "context" "flag" + "io" + "io/ioutil" "math/rand" "net/http" "sync" @@ -10,6 +13,8 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/opentracing-contrib/go-stdlib/nethttp" + opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/httpgrpc" @@ -56,8 +61,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // Frontend queues HTTP requests, dispatches them to backends, and handles retries // for requests which failed. type Frontend struct { - cfg Config - log log.Logger + cfg Config + log log.Logger + tracer nethttp.Transport mtx sync.Mutex cond *sync.Cond @@ -78,6 +84,7 @@ func New(cfg Config, log log.Logger) (*Frontend, error) { log: log, queues: map[string]chan *request{}, } + f.tracer.RoundTripper = f f.cond = sync.NewCond(&f.mtx) return f, nil } @@ -93,21 +100,34 @@ func (f *Frontend) Close() { // ServeHTTP serves HTTP requests. func (f *Frontend) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if err := f.serveHTTP(w, r); err != nil { + r, ht := nethttp.TraceRequest(opentracing.GlobalTracer(), r) + defer ht.Finish() + + resp, err := f.tracer.RoundTrip(r) + if err != nil { server.WriteError(w, err) + return + } + + hs := w.Header() + for h, vs := range resp.Header { + hs[h] = vs } + w.WriteHeader(resp.StatusCode) + io.Copy(w, resp.Body) } -func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error { +// RoundTrip implement http.Transport. +func (f *Frontend) RoundTrip(r *http.Request) (*http.Response, error) { ctx := r.Context() userID, err := user.ExtractOrgID(ctx) if err != nil { - return err + return nil, err } req, err := server.HTTPRequest(r) if err != nil { - return err + return nil, err } request := &request{ @@ -120,7 +140,7 @@ func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error { var lastErr error for tries := 0; tries < f.cfg.MaxRetries; tries++ { if err := f.queueRequest(userID, request); err != nil { - return err + return nil, err } var resp *httpgrpc.HTTPResponse @@ -128,7 +148,7 @@ func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error { case <-ctx.Done(): // TODO propagate cancellation. //request.Cancel() - return errCanceled + return nil, errCanceled case resp = <-request.response: case lastErr = <-request.err: @@ -142,11 +162,19 @@ func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error { } retries.Observe(float64(tries)) - server.WriteResponse(w, resp) - return nil + + httpResp := &http.Response{ + StatusCode: int(resp.Code), + Body: ioutil.NopCloser(bytes.NewReader(resp.Body)), + Header: http.Header{}, + } + for _, h := range resp.Headers { + httpResp.Header[h.Key] = h.Values + } + return httpResp, nil } - return lastErr + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "Query failed after %d retries", f.cfg.MaxRetries) } // Process allows backends to pull requests from the frontend. diff --git a/pkg/util/log.go b/pkg/util/log.go index 0464cbe4344..76e5ac4130e 100644 --- a/pkg/util/log.go +++ b/pkg/util/log.go @@ -7,6 +7,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/logging" + "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" "github.com/weaveworks/common/user" "golang.org/x/net/context" @@ -98,7 +99,14 @@ func WithContext(ctx context.Context, l log.Logger) log.Logger { if err != nil { return l } - return WithUserID(userID, l) + l = WithUserID(userID, l) + + traceID, ok := middleware.ExtractTraceID(ctx) + if !ok { + return l + } + + return WithTraceID(traceID, l) } // WithUserID returns a Logger that has information about the current user in @@ -107,3 +115,10 @@ func WithUserID(userID string, l log.Logger) log.Logger { // See note in WithContext. return log.With(l, "org_id", userID) } + +// WithTraceID returns a Logger that has information about the traceID in +// its details. +func WithTraceID(traceID string, l log.Logger) log.Logger { + // See note in WithContext. + return log.With(l, "trace_id", traceID) +} diff --git a/vendor/github.com/weaveworks/common/middleware/http_tracing.go b/vendor/github.com/weaveworks/common/middleware/http_tracing.go new file mode 100644 index 00000000000..f2676235327 --- /dev/null +++ b/vendor/github.com/weaveworks/common/middleware/http_tracing.go @@ -0,0 +1,43 @@ +package middleware + +import ( + "net/http" + + "github.com/opentracing-contrib/go-stdlib/nethttp" + "github.com/opentracing/opentracing-go" + jaeger "github.com/uber/jaeger-client-go" + "golang.org/x/net/context" +) + +// Tracer is a middleware which traces incoming requests. +type Tracer struct{} + +// Wrap implements Interface +func (t Tracer) Wrap(next http.Handler) http.Handler { + traceHandler := nethttp.Middleware(opentracing.GlobalTracer(), next) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var maybeTracer http.Handler + // Don't try and trace websocket requests because nethttp.Middleware + // doesn't support http.Hijack yet + if IsWSHandshakeRequest(r) { + maybeTracer = next + } else { + maybeTracer = traceHandler + } + maybeTracer.ServeHTTP(w, r) + }) +} + +// ExtractTraceID extracts the trace id, if any from the context. +func ExtractTraceID(ctx context.Context) (string, bool) { + sp := opentracing.SpanFromContext(ctx) + if sp == nil { + return "", false + } + sctx, ok := sp.Context().(jaeger.SpanContext) + if !ok { + return "", false + } + + return sctx.TraceID().String(), true +} diff --git a/vendor/github.com/weaveworks/common/middleware/logging.go b/vendor/github.com/weaveworks/common/middleware/logging.go index c081dbc056e..148a87e73d8 100644 --- a/vendor/github.com/weaveworks/common/middleware/logging.go +++ b/vendor/github.com/weaveworks/common/middleware/logging.go @@ -17,6 +17,11 @@ type Log struct { // logWithRequest information from the request and context as fields. func (l Log) logWithRequest(r *http.Request) logging.Interface { + traceID, ok := ExtractTraceID(r.Context()) + if ok { + l.Log = l.Log.WithField("traceID", traceID) + } + return user.LogWith(r.Context(), l.Log) } diff --git a/vendor/github.com/weaveworks/common/server/server.go b/vendor/github.com/weaveworks/common/server/server.go index d69dc4d5b62..ed11c4ac385 100644 --- a/vendor/github.com/weaveworks/common/server/server.go +++ b/vendor/github.com/weaveworks/common/server/server.go @@ -11,7 +11,6 @@ import ( "github.com/gorilla/mux" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/mwitkow/go-grpc-middleware" - "github.com/opentracing-contrib/go-stdlib/nethttp" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "golang.org/x/net/context" @@ -140,6 +139,7 @@ func New(cfg Config) (*Server, error) { RegisterInstrumentation(router) } httpMiddleware := []middleware.Interface{ + middleware.Tracer{}, middleware.Log{ Log: log, }, @@ -147,10 +147,8 @@ func New(cfg Config) (*Server, error) { Duration: requestDuration, RouteMatcher: router, }, - middleware.Func(func(handler http.Handler) http.Handler { - return nethttp.Middleware(opentracing.GlobalTracer(), handler) - }), } + httpMiddleware = append(httpMiddleware, cfg.HTTPMiddleware...) httpServer := &http.Server{ ReadTimeout: cfg.HTTPServerReadTimeout,