Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 39 additions & 11 deletions pkg/querier/frontend/frontend.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package frontend

import (
"bytes"
"context"
"flag"
"io"
"io/ioutil"
"math/rand"
"net/http"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/opentracing-contrib/go-stdlib/nethttp"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
Expand Down Expand Up @@ -56,8 +61,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// Frontend queues HTTP requests, dispatches them to backends, and handles retries
// for requests which failed.
type Frontend struct {
cfg Config
log log.Logger
cfg Config
log log.Logger
tracer nethttp.Transport

mtx sync.Mutex
cond *sync.Cond
Expand All @@ -78,6 +84,7 @@ func New(cfg Config, log log.Logger) (*Frontend, error) {
log: log,
queues: map[string]chan *request{},
}
f.tracer.RoundTripper = f
f.cond = sync.NewCond(&f.mtx)
return f, nil
}
Expand All @@ -93,21 +100,34 @@ func (f *Frontend) Close() {

// ServeHTTP serves HTTP requests.
func (f *Frontend) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err := f.serveHTTP(w, r); err != nil {
r, ht := nethttp.TraceRequest(opentracing.GlobalTracer(), r)
defer ht.Finish()

resp, err := f.tracer.RoundTrip(r)
if err != nil {
server.WriteError(w, err)
return
}

hs := w.Header()
for h, vs := range resp.Header {
hs[h] = vs
}
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
}

func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error {
// RoundTrip implement http.Transport.
func (f *Frontend) RoundTrip(r *http.Request) (*http.Response, error) {
ctx := r.Context()
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return err
return nil, err
}

req, err := server.HTTPRequest(r)
if err != nil {
return err
return nil, err
}

request := &request{
Expand All @@ -120,15 +140,15 @@ func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error {
var lastErr error
for tries := 0; tries < f.cfg.MaxRetries; tries++ {
if err := f.queueRequest(userID, request); err != nil {
return err
return nil, err
}

var resp *httpgrpc.HTTPResponse
select {
case <-ctx.Done():
// TODO propagate cancellation.
//request.Cancel()
return errCanceled
return nil, errCanceled

case resp = <-request.response:
case lastErr = <-request.err:
Expand All @@ -142,11 +162,19 @@ func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error {
}

retries.Observe(float64(tries))
server.WriteResponse(w, resp)
return nil

httpResp := &http.Response{
StatusCode: int(resp.Code),
Body: ioutil.NopCloser(bytes.NewReader(resp.Body)),
Header: http.Header{},
}
for _, h := range resp.Headers {
httpResp.Header[h.Key] = h.Values
}
return httpResp, nil
}

return lastErr
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "Query failed after %d retries", f.cfg.MaxRetries)
}

// Process allows backends to pull requests from the frontend.
Expand Down
17 changes: 16 additions & 1 deletion pkg/util/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/logging"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/user"
"golang.org/x/net/context"
Expand Down Expand Up @@ -98,7 +99,14 @@ func WithContext(ctx context.Context, l log.Logger) log.Logger {
if err != nil {
return l
}
return WithUserID(userID, l)
l = WithUserID(userID, l)

traceID, ok := middleware.ExtractTraceID(ctx)
if !ok {
return l
}

return WithTraceID(traceID, l)
}

// WithUserID returns a Logger that has information about the current user in
Expand All @@ -107,3 +115,10 @@ func WithUserID(userID string, l log.Logger) log.Logger {
// See note in WithContext.
return log.With(l, "org_id", userID)
}

// WithTraceID returns a Logger that has information about the traceID in
// its details.
func WithTraceID(traceID string, l log.Logger) log.Logger {
// See note in WithContext.
return log.With(l, "trace_id", traceID)
}
43 changes: 43 additions & 0 deletions vendor/github.com/weaveworks/common/middleware/http_tracing.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions vendor/github.com/weaveworks/common/middleware/logging.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions vendor/github.com/weaveworks/common/server/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.