From 3502fb4e849ad5edc33c231642ab80b89eff29ab Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 10 Sep 2018 13:02:25 +0100 Subject: [PATCH] Split incoming requests by day and run them in parallel. - Generic code to parse incoming query_range requests, mutate them and round trip them. - Split queries along day boundaries, modulo step. - Run queries in parallel and combine their results. - Ensure we propagate org ids correctly; add e2e tests. - Take care to ensure we propagate trace IDs correctly; involved updating weaveworks/common. Signed-off-by: Tom Wilkie --- Gopkg.lock | 10 +- Gopkg.toml | 6 + pkg/querier/frontend/frontend.go | 41 ++- pkg/querier/frontend/frontend_test.go | 64 +++- pkg/querier/frontend/roundtrip.go | 283 ++++++++++++++++ pkg/querier/frontend/roundtrip_test.go | 203 ++++++++++++ pkg/querier/frontend/split_by_day.go | 193 +++++++++++ pkg/querier/frontend/split_by_day_test.go | 301 ++++++++++++++++++ pkg/querier/frontend/worker.go | 3 - .../common/httpgrpc/server/server.go | 12 +- .../weaveworks/common/logging/logging.go | 19 +- .../weaveworks/common/logging/logrus.go | 14 +- 12 files changed, 1106 insertions(+), 43 deletions(-) create mode 100644 pkg/querier/frontend/roundtrip.go create mode 100644 pkg/querier/frontend/roundtrip_test.go create mode 100644 pkg/querier/frontend/split_by_day.go create mode 100644 pkg/querier/frontend/split_by_day_test.go diff --git a/Gopkg.lock b/Gopkg.lock index d12c96d8122..eebfd020f8b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -917,8 +917,8 @@ revision = "be0d55e547b147ea1817f037cab9458bf7fc7850" [[projects]] - branch = "master" - digest = "1:7e52f72864ff1d884dbee16153715d71510a3be1b715b313dcf416d5bc9bbe61" + branch = "dont-trace-http-on-grpc" + digest = "1:532adf732d56ddee98d6953e626b23befed7abbcee0d5e985aed2e4b46ea565f" name = "github.com/weaveworks/common" packages = [ "aws", @@ -938,7 +938,8 @@ "user", ] pruneopts = "UT" - revision = "6a85bf520acfc70202792d295429ffcf10de9e53" + revision = "82ab3ef4581bf3d7c2d662193ea4bee246541b91" + source = "github.com/tomwilkie/weaveworks-common" [[projects]] digest = "1:efac30de93ca1ff38050f46dc34f1338ebc8778de488f919f79ad9e6188719d3" @@ -1385,6 +1386,7 @@ "github.com/prometheus/prometheus/rules", "github.com/prometheus/prometheus/scrape", "github.com/prometheus/prometheus/storage", + "github.com/prometheus/prometheus/util/stats", "github.com/prometheus/prometheus/util/strutil", "github.com/prometheus/prometheus/web/api/v1", "github.com/prometheus/tsdb", @@ -1392,6 +1394,8 @@ "github.com/segmentio/fasthash/fnv1a", "github.com/stretchr/testify/assert", "github.com/stretchr/testify/require", + "github.com/uber/jaeger-client-go", + "github.com/uber/jaeger-client-go/config", "github.com/weaveworks/billing-client", "github.com/weaveworks/common/aws", "github.com/weaveworks/common/errors", diff --git a/Gopkg.toml b/Gopkg.toml index 663625f7f9f..9e580407420 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -6,6 +6,12 @@ name = "github.com/aws/aws-sdk-go" version = "v1.10.8" +# Don't commit - wait until https://github.com/weaveworks/common/pull/124 is merged! +[[constraint]] + name = "github.com/weaveworks/common" + source = "github.com/tomwilkie/weaveworks-common" + branch = "dont-trace-http-on-grpc" + # Need an override on jaeger-lib as it has a ^0.8.0 contraint on client_golang. [[override]] name = "github.com/uber/jaeger-lib" diff --git a/pkg/querier/frontend/frontend.go b/pkg/querier/frontend/frontend.go index 5ed5180477a..f2446a76702 100644 --- a/pkg/querier/frontend/frontend.go +++ b/pkg/querier/frontend/frontend.go @@ -50,20 +50,22 @@ var ( type Config struct { MaxOutstandingPerTenant int MaxRetries int + SplitQueriesByDay bool } // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.") - f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyon this, the downstream error is returned.") + f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.") + f.BoolVar(&cfg.SplitQueriesByDay, "querier.split-queries-by-day", false, "Split queries by day and execute in parallel.") } // Frontend queues HTTP requests, dispatches them to backends, and handles retries // for requests which failed. type Frontend struct { - cfg Config - log log.Logger - tracer nethttp.Transport + cfg Config + log log.Logger + roundTripper http.RoundTripper mtx sync.Mutex cond *sync.Cond @@ -84,7 +86,31 @@ func New(cfg Config, log log.Logger) (*Frontend, error) { log: log, queues: map[string]chan *request{}, } - f.tracer.RoundTripper = f + + // We need to do the opentracing at the leafs of the roundtrippers, as a + // single request could turn into multiple requests. + tracingRoundTripper := &nethttp.Transport{ + RoundTripper: f, + } + var roundTripper http.RoundTripper = RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + req, ht := nethttp.TraceRequest(opentracing.GlobalTracer(), req) + defer ht.Finish() + + return tracingRoundTripper.RoundTrip(req) + }) + + if cfg.SplitQueriesByDay { + roundTripper = &queryRangeRoundTripper{ + downstream: roundTripper, + queryRangeMiddleware: &splitByDay{ + downstream: &queryRangeTerminator{ + downstream: roundTripper, + }, + }, + } + } + + f.roundTripper = roundTripper f.cond = sync.NewCond(&f.mtx) return f, nil } @@ -100,10 +126,7 @@ func (f *Frontend) Close() { // ServeHTTP serves HTTP requests. func (f *Frontend) ServeHTTP(w http.ResponseWriter, r *http.Request) { - r, ht := nethttp.TraceRequest(opentracing.GlobalTracer(), r) - defer ht.Finish() - - resp, err := f.tracer.RoundTrip(r) + resp, err := f.roundTripper.RoundTrip(r) if err != nil { server.WriteError(w, err) return diff --git a/pkg/querier/frontend/frontend_test.go b/pkg/querier/frontend/frontend_test.go index 01971f79b52..4fd722ce5bd 100644 --- a/pkg/querier/frontend/frontend_test.go +++ b/pkg/querier/frontend/frontend_test.go @@ -10,8 +10,13 @@ import ( "testing" "github.com/go-kit/kit/log" + "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" + "github.com/opentracing-contrib/go-stdlib/nethttp" + opentracing "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + jaeger "github.com/uber/jaeger-client-go" + "github.com/uber/jaeger-client-go/config" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" "google.golang.org/grpc" @@ -70,6 +75,55 @@ func TestFrontendRetries(t *testing.T) { testFrontend(t, handler, test) } +func TestFrontendPropagateTrace(t *testing.T) { + closer, err := config.Configuration{}.InitGlobalTracer("test") + require.NoError(t, err) + defer closer.Close() + + observedTraceID := make(chan string, 2) + + handler := middleware.Tracer{}.Wrap(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sp := opentracing.SpanFromContext(r.Context()) + defer sp.Finish() + + traceID := fmt.Sprintf("%v", sp.Context().(jaeger.SpanContext).TraceID()) + observedTraceID <- traceID + + w.Write([]byte(responseBody)) + })) + + test := func(addr string) { + sp, ctx := opentracing.StartSpanFromContext(context.Background(), "client") + defer sp.Finish() + traceID := fmt.Sprintf("%v", sp.Context().(jaeger.SpanContext).TraceID()) + + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/%s", addr, query), nil) + require.NoError(t, err) + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(ctx, "1"), req) + require.NoError(t, err) + + req, tr := nethttp.TraceRequest(opentracing.GlobalTracer(), req) + defer tr.Finish() + + client := http.Client{ + Transport: &nethttp.Transport{}, + } + resp, err := client.Do(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + defer resp.Body.Close() + _, err = ioutil.ReadAll(resp.Body) + require.NoError(t, err) + + // Query should do two calls. + assert.Equal(t, traceID, <-observedTraceID) + assert.Equal(t, traceID, <-observedTraceID) + } + testFrontend(t, handler, test) +} + func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) { logger := log.NewNopLogger() //log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) @@ -78,6 +132,7 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) { workerConfig WorkerConfig ) util.DefaultValues(&config, &workerConfig) + config.SplitQueriesByDay = true grpcListen, err := net.Listen("tcp", "") require.NoError(t, err) @@ -89,13 +144,18 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) { frontend, err := New(config, logger) require.NoError(t, err) - grpcServer := grpc.NewServer() + grpcServer := grpc.NewServer( + grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(opentracing.GlobalTracer())), + ) defer grpcServer.GracefulStop() RegisterFrontendServer(grpcServer, frontend) httpServer := http.Server{ - Handler: middleware.AuthenticateUser.Wrap(frontend), + Handler: middleware.Merge( + middleware.AuthenticateUser, + middleware.Tracer{}, + ).Wrap(frontend), } defer httpServer.Shutdown(context.Background()) diff --git a/pkg/querier/frontend/roundtrip.go b/pkg/querier/frontend/roundtrip.go new file mode 100644 index 00000000000..96d8f4548f8 --- /dev/null +++ b/pkg/querier/frontend/roundtrip.go @@ -0,0 +1,283 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Mostly lifted from prometheus/web/api/v1/api.go. + +package frontend + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "math" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/go-kit/kit/log/level" + jsoniter "github.com/json-iterator/go" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/util/stats" + + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/util" +) + +var ( + errEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "end timestamp must not be before start time") + errNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "zero or negative query resolution step widths are not accepted. Try a positive integer") + errStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") +) + +// RoundTripperFunc is like http.HandlerFunc, but for http.RoundTripper. +type RoundTripperFunc func(*http.Request) (*http.Response, error) + +// RoundTrip implements http.RoundTripper. +func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return fn(req) +} + +type queryRangeMiddleware interface { + Do(context.Context, *queryRangeRequest) (*apiResponse, error) +} + +type queryRangeRoundTripper struct { + downstream http.RoundTripper + queryRangeMiddleware queryRangeMiddleware +} + +func (q queryRangeRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + if !strings.HasSuffix(r.URL.Path, "/query_range") { + return q.downstream.RoundTrip(r) + } + + request, err := parseQueryRangeRequest(r) + if err != nil { + return nil, err + } + + response, err := q.queryRangeMiddleware.Do(r.Context(), request) + if err != nil { + return nil, err + } + + return response.toHTTPResponse() +} + +type queryRangeTerminator struct { + downstream http.RoundTripper +} + +func (q queryRangeTerminator) Do(ctx context.Context, r *queryRangeRequest) (*apiResponse, error) { + request, err := r.toHTTPRequest(ctx) + if err != nil { + return nil, err + } + + if err := user.InjectOrgIDIntoHTTPRequest(ctx, request); err != nil { + return nil, err + } + + response, err := q.downstream.RoundTrip(request) + if err != nil { + return nil, err + } + defer response.Body.Close() + + return parseQueryRangeResponse(response) +} + +type queryRangeRequest struct { + path string + start, end int64 // Milliseconds since epoch. + step int64 // Milliseconds. + timeout time.Duration + query string +} + +func parseQueryRangeRequest(r *http.Request) (*queryRangeRequest, error) { + var result queryRangeRequest + var err error + result.start, err = parseTime(r.FormValue("start")) + if err != nil { + return nil, err + } + + result.end, err = parseTime(r.FormValue("end")) + if err != nil { + return nil, err + } + + if result.end < result.start { + return nil, errEndBeforeStart + } + + result.step, err = parseDurationMs(r.FormValue("step")) + if err != nil { + return nil, err + } + + if result.step <= 0 { + return nil, errNegativeStep + } + + // For safety, limit the number of returned points per timeseries. + // This is sufficient for 60s resolution for a week or 1h resolution for a year. + if (result.end-result.start)/result.step > 11000 { + return nil, errStepTooSmall + } + + result.query = r.FormValue("query") + result.path = r.URL.Path + return &result, nil +} + +func (q queryRangeRequest) toHTTPRequest(ctx context.Context) (*http.Request, error) { + params := url.Values{ + "start": []string{encodeTime(q.start)}, + "end": []string{encodeTime(q.end)}, + "step": []string{encodeDurationMs(q.step)}, + "query": []string{q.query}, + } + u := &url.URL{ + Path: q.path, + RawQuery: params.Encode(), + } + req := &http.Request{ + Method: "GET", + RequestURI: u.String(), // This is what the httpgrpc code looks at. + URL: u, + Body: http.NoBody, + Header: http.Header{}, + } + + return req.WithContext(ctx), nil +} + +func parseTime(s string) (int64, error) { + if t, err := strconv.ParseFloat(s, 64); err == nil { + s, ns := math.Modf(t) + tm := time.Unix(int64(s), int64(ns*float64(time.Second))) + return tm.UnixNano() / int64(time.Millisecond/time.Nanosecond), nil + } + if t, err := time.Parse(time.RFC3339Nano, s); err == nil { + return t.UnixNano() / int64(time.Millisecond/time.Nanosecond), nil + } + return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid timestamp", s) +} + +func parseDurationMs(s string) (int64, error) { + if d, err := strconv.ParseFloat(s, 64); err == nil { + ts := d * float64(time.Second/time.Millisecond) + if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) { + return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid duration. It overflows int64", s) + } + return int64(ts), nil + } + if d, err := model.ParseDuration(s); err == nil { + return int64(d) / int64(time.Millisecond/time.Nanosecond), nil + } + return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid duration", s) +} + +func encodeTime(t int64) string { + f := float64(t) / 1.0e3 + return strconv.FormatFloat(f, 'f', -1, 64) +} + +func encodeDurationMs(d int64) string { + return strconv.FormatFloat(float64(d)/float64(time.Second/time.Millisecond), 'f', -1, 64) +} + +const statusSuccess = "success" + +type apiResponse struct { + Status string `json:"status"` + Data queryRangeResponse `json:"data,omitempty"` + ErrorType string `json:"errorType,omitempty"` + Error string `json:"error,omitempty"` +} + +func parseQueryRangeResponse(r *http.Response) (*apiResponse, error) { + if r.StatusCode/100 != 2 { + body, _ := ioutil.ReadAll(r.Body) + return nil, httpgrpc.Errorf(r.StatusCode, string(body)) + } + + var resp apiResponse + if err := json.NewDecoder(r.Body).Decode(&resp); err != nil { + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) + } + return &resp, nil +} + +func (a *apiResponse) toHTTPResponse() (*http.Response, error) { + json := jsoniter.ConfigCompatibleWithStandardLibrary + b, err := json.Marshal(a) + if err != nil { + level.Error(util.Logger).Log("msg", "error marshalling json response", "err", err) + return nil, err + } + resp := http.Response{ + Header: http.Header{ + "Content-Type": []string{"application/json"}, + }, + Body: ioutil.NopCloser(bytes.NewBuffer(b)), + StatusCode: http.StatusOK, + } + return &resp, nil +} + +// queryRangeResponse contains result data for a query_range. +type queryRangeResponse struct { + ResultType model.ValueType `json:"resultType"` + Result model.Value `json:"result"` + Stats *stats.QueryStats `json:"stats,omitempty"` +} + +func (q *queryRangeResponse) UnmarshalJSON(b []byte) error { + v := struct { + ResultType model.ValueType `json:"resultType"` + Stats *stats.QueryStats `json:"stats,omitempty"` + Result json.RawMessage `json:"result"` + }{} + + err := json.Unmarshal(b, &v) + if err != nil { + return err + } + + q.ResultType = v.ResultType + q.Stats = v.Stats + + switch v.ResultType { + case model.ValVector: + var vv model.Vector + err = json.Unmarshal(v.Result, &vv) + q.Result = vv + + case model.ValMatrix: + var mv model.Matrix + err = json.Unmarshal(v.Result, &mv) + q.Result = mv + + default: + err = fmt.Errorf("unexpected value type %q", v.ResultType) + } + return err +} diff --git a/pkg/querier/frontend/roundtrip_test.go b/pkg/querier/frontend/roundtrip_test.go new file mode 100644 index 00000000000..6f8c6bbc996 --- /dev/null +++ b/pkg/querier/frontend/roundtrip_test.go @@ -0,0 +1,203 @@ +package frontend + +import ( + "bytes" + "context" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" +) + +const ( + query = "/api/v1/query_range?end=1536760200&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&step=120" + responseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{},"values":[[1536763606.651,"137"],[1536763607.651,"137"]]}]}}` +) + +var parsedResponse = &apiResponse{ + Status: "success", + Data: queryRangeResponse{ + ResultType: model.ValMatrix, + Result: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{}, + Values: []model.SamplePair{ + {1536763606651, 137}, + {1536763607651, 137}, + }, + }, + }, + }, +} + +func TestQueryRangeRequest(t *testing.T) { + for i, tc := range []struct { + url string + expected *queryRangeRequest + expectedErr error + }{ + { + url: query, + expected: &queryRangeRequest{ + path: "/api/v1/query_range", + start: 1536673680 * 1e3, + end: 1536760200 * 1e3, + step: 120 * 1e3, + query: "sum(container_memory_rss) by (namespace)", + }, + }, + { + url: "api/v1/query_range?start=foo", + expectedErr: httpgrpc.Errorf(http.StatusBadRequest, "cannot parse \"foo\" to a valid timestamp"), + }, + { + url: "api/v1/query_range?start=123&end=bar", + expectedErr: httpgrpc.Errorf(http.StatusBadRequest, "cannot parse \"bar\" to a valid timestamp"), + }, + { + url: "api/v1/query_range?start=123&end=0", + expectedErr: errEndBeforeStart, + }, + { + url: "api/v1/query_range?start=123&end=456&step=baz", + expectedErr: httpgrpc.Errorf(http.StatusBadRequest, "cannot parse \"baz\" to a valid duration"), + }, + { + url: "api/v1/query_range?start=123&end=456&step=-1", + expectedErr: errNegativeStep, + }, + { + url: "api/v1/query_range?start=0&end=11001&step=1", + expectedErr: errStepTooSmall, + }, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + r, err := http.NewRequest("GET", tc.url, nil) + require.NoError(t, err) + + ctx := user.InjectOrgID(context.Background(), "1") + r = r.WithContext(ctx) + + req, err := parseQueryRangeRequest(r) + if err != nil { + require.EqualValues(t, tc.expectedErr, err) + return + } + require.EqualValues(t, tc.expected, req) + + rdash, err := req.toHTTPRequest(context.Background()) + require.NoError(t, err) + require.EqualValues(t, tc.url, rdash.RequestURI) + }) + } +} + +func TestQueryRangeResponse(t *testing.T) { + for i, tc := range []struct { + body string + expected *apiResponse + }{ + { + body: responseBody, + expected: parsedResponse, + }, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + response := &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: ioutil.NopCloser(bytes.NewBuffer([]byte(tc.body))), + } + resp, err := parseQueryRangeResponse(response) + require.NoError(t, err) + assert.Equal(t, tc.expected, resp) + + // Reset response, as the above call will have consumed the body reader. + response = &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: ioutil.NopCloser(bytes.NewBuffer([]byte(tc.body))), + } + resp2, err := resp.toHTTPResponse() + require.NoError(t, err) + assert.Equal(t, response, resp2) + }) + } +} + +func TestRoundTrip(t *testing.T) { + s := httptest.NewServer( + middleware.AuthenticateUser.Wrap( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.RequestURI == query { + w.Write([]byte(responseBody)) + } else { + w.Write([]byte("bar")) + } + }), + ), + ) + defer s.Close() + + u, err := url.Parse(s.URL) + require.NoError(t, err) + + downstream := singleHostRoundTripper{ + host: u.Host, + downstream: http.DefaultTransport, + } + roundtripper := queryRangeRoundTripper{ + downstream: downstream, + queryRangeMiddleware: queryRangeTerminator{ + downstream: downstream, + }, + } + + for i, tc := range []struct { + path, expectedBody string + }{ + {"/foo", "bar"}, + {query, responseBody}, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + req, err := http.NewRequest("GET", tc.path, http.NoBody) + require.NoError(t, err) + + // query-frontend doesn't actually authenticate requests, we rely on + // the queriers to do this. Hence we ensure the request doesn't have a + // org ID in the ctx, but does have the header. + ctx := user.InjectOrgID(context.Background(), "1") + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) + + resp, err := roundtripper.RoundTrip(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + bs, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, tc.expectedBody, string(bs)) + }) + } +} + +type singleHostRoundTripper struct { + host string + downstream http.RoundTripper +} + +func (s singleHostRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + r.URL.Scheme = "http" + r.URL.Host = s.host + return s.downstream.RoundTrip(r) +} diff --git a/pkg/querier/frontend/split_by_day.go b/pkg/querier/frontend/split_by_day.go new file mode 100644 index 00000000000..780fe2801e9 --- /dev/null +++ b/pkg/querier/frontend/split_by_day.go @@ -0,0 +1,193 @@ +package frontend + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/prometheus/common/model" +) + +const millisecondPerDay = int64(24 * time.Hour / time.Millisecond) + +type splitByDay struct { + downstream queryRangeMiddleware +} + +type response struct { + req queryRangeRequest + resp *apiResponse + err error +} + +func (s splitByDay) Do(ctx context.Context, r *queryRangeRequest) (*apiResponse, error) { + // First we're going to build new requests, one for each day, taking care + // to line up the boundaries with step. + reqs := splitQuery(r) + + // Next, do the requests in parallel. + // If one of the requests fail, we want to be able to cancel the rest of them. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + resps := make(chan *apiResponse) + errs := make(chan error) + for _, req := range reqs { + go func(req *queryRangeRequest) { + resp, err := s.downstream.Do(ctx, req) + if err != nil { + errs <- err + } else { + resps <- resp + } + }(req) + } + + // Gather up the responses and errors. + var responses []*apiResponse + var firstErr error + for range reqs { + select { + case resp := <-resps: + responses = append(responses, resp) + case err := <-errs: + // Only record the first error, as subsequent errors are cancellations. + if firstErr == nil { + firstErr = err + cancel() + } + } + } + + if firstErr != nil { + return nil, firstErr + } + + return mergeAPIResponses(responses) +} + +func splitQuery(r *queryRangeRequest) []*queryRangeRequest { + reqs := []*queryRangeRequest{} + for start := r.start; start < r.end; start = nextDayBoundary(start, r.step) + r.step { + end := nextDayBoundary(start, r.step) + if end+r.step >= r.end { + end = r.end + } + + reqs = append(reqs, &queryRangeRequest{ + path: r.path, + start: start, + end: end, + step: r.step, + query: r.query, + }) + } + return reqs +} + +// Round up to the step before the next day boundary. +func nextDayBoundary(t, step int64) int64 { + offsetToDayBoundary := step - (t % millisecondPerDay % step) + t = ((t / millisecondPerDay) + 1) * millisecondPerDay + return t - offsetToDayBoundary +} + +func mergeAPIResponses(responses []*apiResponse) (*apiResponse, error) { + // Merge the responses. + sort.Sort(byFirstTime(responses)) + + if len(responses) == 0 { + return &apiResponse{ + Status: statusSuccess, + }, nil + } + + switch responses[0].Data.Result.(type) { + case model.Vector: + return vectorMerge(responses) + case model.Matrix: + return matrixMerge(responses) + default: + return nil, fmt.Errorf("unexpected response type") + } +} + +type byFirstTime []*apiResponse + +func (a byFirstTime) Len() int { return len(a) } +func (a byFirstTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byFirstTime) Less(i, j int) bool { return minTime(a[i]) < minTime(a[j]) } + +func minTime(resp *apiResponse) model.Time { + switch result := resp.Data.Result.(type) { + case model.Vector: + if len(result) == 0 { + return -1 + } + return result[0].Timestamp + + case model.Matrix: + if len(result) == 0 { + return -1 + } + if len(result[0].Values) == 0 { + return -1 + } + return result[0].Values[0].Timestamp + + default: + return -1 + } +} + +func vectorMerge(resps []*apiResponse) (*apiResponse, error) { + var output model.Vector + for _, resp := range resps { + output = append(output, resp.Data.Result.(model.Vector)...) + } + return &apiResponse{ + Status: statusSuccess, + Data: queryRangeResponse{ + ResultType: model.ValVector, + Result: output, + }, + }, nil +} + +func matrixMerge(resps []*apiResponse) (*apiResponse, error) { + output := map[string]*model.SampleStream{} + for _, resp := range resps { + matrix := resp.Data.Result.(model.Matrix) + for _, stream := range matrix { + metric := stream.Metric.String() + existing, ok := output[metric] + if !ok { + existing = &model.SampleStream{ + Metric: stream.Metric, + } + } + existing.Values = append(existing.Values, stream.Values...) + output[metric] = existing + } + } + + keys := make([]string, 0, len(output)) + for key := range output { + keys = append(keys, key) + } + sort.Strings(keys) + + result := make(model.Matrix, 0, len(output)) + for _, key := range keys { + result = append(result, output[key]) + } + + return &apiResponse{ + Status: statusSuccess, + Data: queryRangeResponse{ + ResultType: model.ValMatrix, + Result: result, + }, + }, nil +} diff --git a/pkg/querier/frontend/split_by_day_test.go b/pkg/querier/frontend/split_by_day_test.go new file mode 100644 index 00000000000..0009db03a2f --- /dev/null +++ b/pkg/querier/frontend/split_by_day_test.go @@ -0,0 +1,301 @@ +package frontend + +import ( + "context" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" +) + +const seconds = 1e3 // 1e3 milliseconds per second. + +func TestNextDayBoundary(t *testing.T) { + for i, tc := range []struct { + in, step, out int64 + }{ + {0, 1, millisecondPerDay - 1}, + {0, 15 * seconds, millisecondPerDay - 15*seconds}, + {1 * seconds, 15 * seconds, millisecondPerDay - (15-1)*seconds}, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + require.Equal(t, tc.out, nextDayBoundary(tc.in, tc.step)) + }) + } +} + +func TestSplitQuery(t *testing.T) { + for i, tc := range []struct { + input *queryRangeRequest + expected []*queryRangeRequest + }{ + { + input: &queryRangeRequest{ + start: 0, + end: 60 * 60 * seconds, + step: 15 * seconds, + query: "foo", + }, + expected: []*queryRangeRequest{ + { + start: 0, + end: 60 * 60 * seconds, + step: 15 * seconds, + query: "foo", + }, + }, + }, + { + input: &queryRangeRequest{ + start: 0, + end: 24 * 3600 * seconds, + step: 15 * seconds, + query: "foo", + }, + expected: []*queryRangeRequest{ + { + start: 0, + end: 24 * 3600 * seconds, + step: 15 * seconds, + query: "foo", + }, + }, + }, + { + input: &queryRangeRequest{ + start: 0, + end: 2 * 24 * 3600 * seconds, + step: 15 * seconds, + query: "foo", + }, + expected: []*queryRangeRequest{ + { + start: 0, + end: (24 * 3600 * seconds) - (15 * seconds), + step: 15 * seconds, + query: "foo", + }, + { + start: 24 * 3600 * seconds, + end: 2 * 24 * 3600 * seconds, + step: 15 * seconds, + query: "foo", + }, + }, + }, + { + input: &queryRangeRequest{ + start: 3 * 3600 * seconds, + end: 3 * 24 * 3600 * seconds, + step: 15 * seconds, + query: "foo", + }, + expected: []*queryRangeRequest{ + { + start: 3 * 3600 * seconds, + end: (24 * 3600 * seconds) - (15 * seconds), + step: 15 * seconds, + query: "foo", + }, + { + start: 24 * 3600 * seconds, + end: (2 * 24 * 3600 * seconds) - (15 * seconds), + step: 15 * seconds, + query: "foo", + }, + { + start: 2 * 24 * 3600 * seconds, + end: 3 * 24 * 3600 * seconds, + step: 15 * seconds, + query: "foo", + }, + }, + }, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + require.Equal(t, tc.expected, splitQuery(tc.input)) + }) + } +} + +func TestMergeAPIResponses(t *testing.T) { + for i, tc := range []struct { + input []*apiResponse + expected *apiResponse + }{ + // No responses shouldn't panic. + { + input: []*apiResponse{}, + expected: &apiResponse{ + Status: statusSuccess, + }, + }, + + // A single empty response shouldn't panic. + { + input: []*apiResponse{ + { + Data: queryRangeResponse{ + ResultType: model.ValMatrix, + Result: model.Matrix{}, + }, + }, + }, + expected: &apiResponse{ + Status: statusSuccess, + Data: queryRangeResponse{ + ResultType: model.ValMatrix, + Result: model.Matrix{}, + }, + }, + }, + + // Multiple empty responses shouldn't panic. + { + input: []*apiResponse{ + { + Data: queryRangeResponse{ + ResultType: model.ValMatrix, + Result: model.Matrix{}, + }, + }, + { + Data: queryRangeResponse{ + ResultType: model.ValMatrix, + Result: model.Matrix{}, + }, + }, + }, + expected: &apiResponse{ + Status: statusSuccess, + Data: queryRangeResponse{ + ResultType: model.ValMatrix, + Result: model.Matrix{}, + }, + }, + }, + + // Multiple empty responses shouldn't panic. + { + input: []*apiResponse{ + { + Data: queryRangeResponse{ + ResultType: model.ValMatrix, + Result: model.Matrix{ + { + Metric: model.Metric{}, + Values: []model.SamplePair{ + {0, 0}, + {1, 1}, + }, + }, + }, + }, + }, + { + Data: queryRangeResponse{ + ResultType: model.ValMatrix, + Result: model.Matrix{ + { + Metric: model.Metric{}, + Values: []model.SamplePair{ + {2, 2}, + {3, 3}, + }, + }, + }, + }, + }, + }, + expected: &apiResponse{ + Status: statusSuccess, + Data: queryRangeResponse{ + ResultType: model.ValMatrix, + Result: model.Matrix{ + { + Metric: model.Metric{}, + Values: []model.SamplePair{ + {0, 0}, + {1, 1}, + {2, 2}, + {3, 3}, + }, + }, + }, + }, + }, + }, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + output, err := mergeAPIResponses(tc.input) + require.NoError(t, err) + require.Equal(t, tc.expected, output) + }) + } +} + +func TestSplitByDay(t *testing.T) { + s := httptest.NewServer( + middleware.AuthenticateUser.Wrap( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(responseBody)) + }), + ), + ) + defer s.Close() + + u, err := url.Parse(s.URL) + require.NoError(t, err) + + roundtripper := queryRangeRoundTripper{ + queryRangeMiddleware: splitByDay{ + downstream: queryRangeTerminator{ + downstream: singleHostRoundTripper{ + host: u.Host, + downstream: http.DefaultTransport, + }, + }, + }, + } + + mergedResponse, err := mergeAPIResponses([]*apiResponse{ + parsedResponse, + parsedResponse, + }) + require.NoError(t, err) + + mergedHTTPResponse, err := mergedResponse.toHTTPResponse() + require.NoError(t, err) + + mergedHTTPResponseBody, err := ioutil.ReadAll(mergedHTTPResponse.Body) + require.NoError(t, err) + + for i, tc := range []struct { + path, expectedBody string + }{ + {query, string(mergedHTTPResponseBody)}, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + req, err := http.NewRequest("GET", tc.path, http.NoBody) + require.NoError(t, err) + + ctx := user.InjectOrgID(context.Background(), "1") + req = req.WithContext(ctx) + + resp, err := roundtripper.RoundTrip(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + bs, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, tc.expectedBody, string(bs)) + }) + } +} diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 03385aaa1d3..34dbbef2cd8 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -9,9 +9,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/mwitkow/go-grpc-middleware" - "github.com/opentracing/opentracing-go" "google.golang.org/grpc" "google.golang.org/grpc/naming" @@ -213,7 +211,6 @@ func connect(address string) (FrontendClient, error) { address, grpc.WithInsecure(), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), middleware.ClientUserHeaderInterceptor, )), ) diff --git a/vendor/github.com/weaveworks/common/httpgrpc/server/server.go b/vendor/github.com/weaveworks/common/httpgrpc/server/server.go index 7695523e1a5..8f556c3abb6 100644 --- a/vendor/github.com/weaveworks/common/httpgrpc/server/server.go +++ b/vendor/github.com/weaveworks/common/httpgrpc/server/server.go @@ -14,7 +14,6 @@ import ( "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/mwitkow/go-grpc-middleware" "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" "github.com/sercand/kuberesolver" "golang.org/x/net/context" "google.golang.org/grpc" @@ -44,18 +43,9 @@ func (s Server) Handle(ctx context.Context, r *httpgrpc.HTTPRequest) (*httpgrpc. return nil, err } toHeader(r.Headers, req.Header) - if tracer := opentracing.GlobalTracer(); tracer != nil { - clientContext, err := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header)) - if err == nil { - span := tracer.StartSpan("httpgrpc", ext.RPCServerOption(clientContext)) - defer span.Finish() - ctx = opentracing.ContextWithSpan(ctx, span) - } else if err != opentracing.ErrSpanContextNotFound { - logging.Global().Warnf("Failed to extract tracing headers from request: %v", err) - } - } req = req.WithContext(ctx) req.RequestURI = r.Url + recorder := httptest.NewRecorder() s.handler.ServeHTTP(recorder, req) resp := &httpgrpc.HTTPResponse{ diff --git a/vendor/github.com/weaveworks/common/logging/logging.go b/vendor/github.com/weaveworks/common/logging/logging.go index 744b76c7482..8f9b7940216 100644 --- a/vendor/github.com/weaveworks/common/logging/logging.go +++ b/vendor/github.com/weaveworks/common/logging/logging.go @@ -4,23 +4,26 @@ import ( "fmt" "os" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" + "github.com/weaveworks/promrus" ) -// Setup configures logging output to stderr, sets the log level and sets the formatter. +// Setup configures a global logrus logger to output to stderr. +// It populates the standard logrus logger as well as the global logging instance. func Setup(logLevel string) error { - log.SetOutput(os.Stderr) - level, err := log.ParseLevel(logLevel) + level, err := logrus.ParseLevel(logLevel) if err != nil { - return fmt.Errorf("Error parsing log level: %v", err) + return fmt.Errorf("error parsing log level: %v", err) } - log.SetLevel(level) - log.SetFormatter(&textFormatter{}) hook, err := promrus.NewPrometheusHook() // Expose number of log messages as Prometheus metrics. if err != nil { return err } - log.AddHook(hook) + logrus.SetOutput(os.Stderr) + logrus.SetLevel(level) + logrus.SetFormatter(&textFormatter{}) + logrus.AddHook(hook) + SetGlobal(Logrus(logrus.StandardLogger())) return nil } diff --git a/vendor/github.com/weaveworks/common/logging/logrus.go b/vendor/github.com/weaveworks/common/logging/logrus.go index 7896b358bb5..117469c1d12 100644 --- a/vendor/github.com/weaveworks/common/logging/logrus.go +++ b/vendor/github.com/weaveworks/common/logging/logrus.go @@ -28,29 +28,29 @@ type logrusLogger struct { } func (l logrusLogger) WithField(key string, value interface{}) Interface { - return logusEntry{ + return logrusEntry{ Entry: l.Logger.WithField(key, value), } } func (l logrusLogger) WithFields(fields Fields) Interface { - return logusEntry{ + return logrusEntry{ Entry: l.Logger.WithFields(map[string]interface{}(fields)), } } -type logusEntry struct { +type logrusEntry struct { *logrus.Entry } -func (l logusEntry) WithField(key string, value interface{}) Interface { - return logusEntry{ +func (l logrusEntry) WithField(key string, value interface{}) Interface { + return logrusEntry{ Entry: l.Entry.WithField(key, value), } } -func (l logusEntry) WithFields(fields Fields) Interface { - return logusEntry{ +func (l logrusEntry) WithFields(fields Fields) Interface { + return logrusEntry{ Entry: l.Entry.WithFields(map[string]interface{}(fields)), } }