Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
name = "github.com/aws/aws-sdk-go"
version = "v1.10.8"

# Don't commit - wait until https://github.com/weaveworks/common/pull/124 is merged!
[[constraint]]
name = "github.com/weaveworks/common"
source = "github.com/tomwilkie/weaveworks-common"
branch = "dont-trace-http-on-grpc"

# Need an override on jaeger-lib as it has a ^0.8.0 contraint on client_golang.
[[override]]
name = "github.com/uber/jaeger-lib"
Expand Down
41 changes: 32 additions & 9 deletions pkg/querier/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,22 @@ var (
type Config struct {
MaxOutstandingPerTenant int
MaxRetries int
SplitQueriesByDay bool
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.")
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyon this, the downstream error is returned.")
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.BoolVar(&cfg.SplitQueriesByDay, "querier.split-queries-by-day", false, "Split queries by day and execute in parallel.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why day? It's a nice time boundary, but I'm curious if there was reasoning here that makes it the off/on choice, versus being able to specify the range to split on. If someone was using periodic tables with a week range, is there a difference to splitting the query over that time boundary instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! The rows in the index are organised by day, and sub day queries pretty much have to read an entire days index entries anyway. Therefore might as well parallelise by day IMO.

We're running this in prod now and it actually looks like for high-cardinality queries where the execution of the PromQL is the dominant latency, sub-day parallelism might be worth while. In the future we should probably make this tuneable.

}

// Frontend queues HTTP requests, dispatches them to backends, and handles retries
// for requests which failed.
type Frontend struct {
cfg Config
log log.Logger
tracer nethttp.Transport
cfg Config
log log.Logger
roundTripper http.RoundTripper

mtx sync.Mutex
cond *sync.Cond
Expand All @@ -84,7 +86,31 @@ func New(cfg Config, log log.Logger) (*Frontend, error) {
log: log,
queues: map[string]chan *request{},
}
f.tracer.RoundTripper = f

// We need to do the opentracing at the leafs of the roundtrippers, as a
// single request could turn into multiple requests.
tracingRoundTripper := &nethttp.Transport{
RoundTripper: f,
}
var roundTripper http.RoundTripper = RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
req, ht := nethttp.TraceRequest(opentracing.GlobalTracer(), req)
defer ht.Finish()

return tracingRoundTripper.RoundTrip(req)
})

if cfg.SplitQueriesByDay {
roundTripper = &queryRangeRoundTripper{
downstream: roundTripper,
queryRangeMiddleware: &splitByDay{
downstream: &queryRangeTerminator{
downstream: roundTripper,
},
},
}
}

f.roundTripper = roundTripper
f.cond = sync.NewCond(&f.mtx)
return f, nil
}
Expand All @@ -100,10 +126,7 @@ func (f *Frontend) Close() {

// ServeHTTP serves HTTP requests.
func (f *Frontend) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r, ht := nethttp.TraceRequest(opentracing.GlobalTracer(), r)
defer ht.Finish()

resp, err := f.tracer.RoundTrip(r)
resp, err := f.roundTripper.RoundTrip(r)
if err != nil {
server.WriteError(w, err)
return
Expand Down
64 changes: 62 additions & 2 deletions pkg/querier/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ import (
"testing"

"github.com/go-kit/kit/log"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/opentracing-contrib/go-stdlib/nethttp"
opentracing "github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
jaeger "github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
"google.golang.org/grpc"
Expand Down Expand Up @@ -70,6 +75,55 @@ func TestFrontendRetries(t *testing.T) {
testFrontend(t, handler, test)
}

func TestFrontendPropagateTrace(t *testing.T) {
closer, err := config.Configuration{}.InitGlobalTracer("test")
require.NoError(t, err)
defer closer.Close()

observedTraceID := make(chan string, 2)

handler := middleware.Tracer{}.Wrap(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sp := opentracing.SpanFromContext(r.Context())
defer sp.Finish()

traceID := fmt.Sprintf("%v", sp.Context().(jaeger.SpanContext).TraceID())
observedTraceID <- traceID

w.Write([]byte(responseBody))
}))

test := func(addr string) {
sp, ctx := opentracing.StartSpanFromContext(context.Background(), "client")
defer sp.Finish()
traceID := fmt.Sprintf("%v", sp.Context().(jaeger.SpanContext).TraceID())

req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/%s", addr, query), nil)
require.NoError(t, err)
req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(ctx, "1"), req)
require.NoError(t, err)

req, tr := nethttp.TraceRequest(opentracing.GlobalTracer(), req)
defer tr.Finish()

client := http.Client{
Transport: &nethttp.Transport{},
}
resp, err := client.Do(req)
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode)

defer resp.Body.Close()
_, err = ioutil.ReadAll(resp.Body)
require.NoError(t, err)

// Query should do two calls.
assert.Equal(t, traceID, <-observedTraceID)
assert.Equal(t, traceID, <-observedTraceID)
}
testFrontend(t, handler, test)
}

func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) {
logger := log.NewNopLogger() //log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))

Expand All @@ -78,6 +132,7 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) {
workerConfig WorkerConfig
)
util.DefaultValues(&config, &workerConfig)
config.SplitQueriesByDay = true

grpcListen, err := net.Listen("tcp", "")
require.NoError(t, err)
Expand All @@ -89,13 +144,18 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) {
frontend, err := New(config, logger)
require.NoError(t, err)

grpcServer := grpc.NewServer()
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(opentracing.GlobalTracer())),
)
defer grpcServer.GracefulStop()

RegisterFrontendServer(grpcServer, frontend)

httpServer := http.Server{
Handler: middleware.AuthenticateUser.Wrap(frontend),
Handler: middleware.Merge(
middleware.AuthenticateUser,
middleware.Tracer{},
).Wrap(frontend),
}
defer httpServer.Shutdown(context.Background())

Expand Down
Loading