diff --git a/.gitignore b/.gitignore index e0c01dc65c5..12df738aa72 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ cmd/configs/configs cmd/distributor/distributor cmd/ingester/ingester cmd/querier/querier +cmd/query-frontend/query-frontend cmd/ruler/ruler cmd/table-manager/table-manager cmd/lite/lite @@ -10,5 +11,6 @@ cmd/lite/lite .pkg .cache pkg/ingester/client/cortex.pb.go +pkg/querier/frontend/frontend.pb.go pkg/ring/ring.pb.go images/ diff --git a/Makefile b/Makefile index 432747c352e..3daf7e49cd0 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,7 @@ images: @echo > /dev/null # Generating proto code is automated. -PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print) +PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print) vendor/github.com/weaveworks/common/httpgrpc/httpgrpc.proto PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS)) # Building binaries is now automated. The convention is to build a binary diff --git a/cmd/querier/main.go b/cmd/querier/main.go index ac2054708c8..86091978f03 100644 --- a/cmd/querier/main.go +++ b/cmd/querier/main.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/prometheus/web/api/v1" "github.com/prometheus/tsdb" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" "github.com/weaveworks/common/tracing" @@ -21,6 +22,7 @@ import ( "github.com/weaveworks/cortex/pkg/chunk/storage" "github.com/weaveworks/cortex/pkg/distributor" "github.com/weaveworks/cortex/pkg/querier" + "github.com/weaveworks/cortex/pkg/querier/frontend" "github.com/weaveworks/cortex/pkg/ring" "github.com/weaveworks/cortex/pkg/util" ) @@ -39,9 +41,10 @@ func main() { chunkStoreConfig chunk.StoreConfig schemaConfig chunk.SchemaConfig storageConfig storage.Config + workerConfig frontend.WorkerConfig ) util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &querierConfig, - &chunkStoreConfig, &schemaConfig, &storageConfig) + &chunkStoreConfig, &schemaConfig, &storageConfig, &workerConfig) flag.Parse() // Setting the environment variable JAEGER_AGENT_HOST enables tracing @@ -86,6 +89,14 @@ func main() { } defer chunkStore.Stop() + // TODO this avoids our middleware for logging and latecy collection. + worker, err := frontend.NewWorker(workerConfig, httpgrpc_server.NewServer(server.HTTP), util.Logger) + if err != nil { + level.Error(util.Logger).Log("err", err) + os.Exit(1) + } + defer worker.Stop() + queryable, engine := querier.Make(querierConfig, dist, chunkStore) api := v1.NewAPI( engine, diff --git a/cmd/query-frontend/Dockerfile b/cmd/query-frontend/Dockerfile new file mode 100644 index 00000000000..526837146c0 --- /dev/null +++ b/cmd/query-frontend/Dockerfile @@ -0,0 +1,10 @@ +FROM alpine:3.8 +RUN apk add --no-cache ca-certificates +COPY query-frontend /bin/query-frontend +EXPOSE 80 +ENTRYPOINT [ "/bin/query-frontend" ] + +ARG revision +LABEL org.opencontainers.image.title="query-frontend" \ + org.opencontainers.image.source="https://github.com/weaveworks/cortex/tree/master/cmd/query-frontend" \ + org.opencontainers.image.revision="${revision}" diff --git a/cmd/query-frontend/main.go b/cmd/query-frontend/main.go new file mode 100644 index 00000000000..550c84b0a3c --- /dev/null +++ b/cmd/query-frontend/main.go @@ -0,0 +1,53 @@ +package main + +import ( + "flag" + "os" + + "github.com/go-kit/kit/log/level" + "google.golang.org/grpc" + + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/server" + "github.com/weaveworks/common/tracing" + "github.com/weaveworks/cortex/pkg/querier/frontend" + "github.com/weaveworks/cortex/pkg/util" +) + +func main() { + var ( + serverConfig = server.Config{ + MetricsNamespace: "cortex", + GRPCMiddleware: []grpc.UnaryServerInterceptor{ + middleware.ServerUserHeaderInterceptor, + }, + } + frontendConfig frontend.Config + ) + util.RegisterFlags(&serverConfig, &frontendConfig) + flag.Parse() + + // Setting the environment variable JAEGER_AGENT_HOST enables tracing + trace := tracing.NewFromEnv("query-frontend") + defer trace.Close() + + util.InitLogger(&serverConfig) + + server, err := server.New(serverConfig) + if err != nil { + level.Error(util.Logger).Log("msg", "error initializing server", "err", err) + os.Exit(1) + } + defer server.Shutdown() + + f, err := frontend.New(frontendConfig, util.Logger) + if err != nil { + level.Error(util.Logger).Log("msg", "error initializing frontend", "err", err) + os.Exit(1) + } + defer f.Close() + + frontend.RegisterFrontendServer(server.GRPC, f) + server.HTTP.PathPrefix("/api/prom").Handler(middleware.AuthenticateUser.Wrap(f)) + server.Run() +} diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 00000000000..e08669ca6d8 --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,28 @@ +# Cortex Architecture + +*NB this document is a work-in-progress.* + +The Cortex architecture consists of multiple, horizontally scalable microservices. Each microservice uses the most appropriate technique for horizontal scaling; most are stateless and can handle requests for any users, and some (the ingesters) are semi-stateful and depend on consistent hashing. + +For more details on the Cortex architecture, you should read / watch: +- The original design doc "[Project Frankenstein: A multi tenant, scale out Prometheus](https://docs.google.com/document/d/1C7yhMnb1x2sfeoe45f4mnnKConvroWhJ8KQZwIHJOuw/edit#heading=h.nimsq29kl184)" +- PromCon 2016 Talk: "[Multitenant, Scale-Out Prometheus](https://promcon.io/2016-berlin/talks/multitenant-scale-out-prometheus/)" +- KubeCon Prometheus Day talk "Weave Cortex: Multi-tenant, horizontally scalable Prometheus as a Service" [slides](http://www.slideshare.net/weaveworks/weave-cortex-multitenant-horizontally-scalable-prometheus-as-a-service) [video](https://www.youtube.com/watch?v=9Uctgnazfwk) +- PromCon 2017 Talk: "[Cortex: Prometheus as a Service, One Year On](https://promcon.io/2017-munich/talks/cortex-prometheus-as-a-service-one-year-on/)" +- CNCF TOC Presentation; "Horizontally Scalable, Multi-tenant Prometheus" [slides](https://docs.google.com/presentation/d/190oIFgujktVYxWZLhLYN4q8p9dtQYoe4sxHgn4deBSI/edit#slide=id.g3b8e2d6f7e_0_6) + +## Query Path + +### Query Frontend + +The query frontend is an optional job which accepts HTTP requests and queues them by tenant ID, retrying them on errors. This allow for the occasional large query which would otherwise cause a querier OOM, allowing us to over-provision querier parallelism. Also, it prevents multiple large requests from being convoyed on a single querier by distributing them FIFO across all queriers. And finally, it prevent a single tenant from DoSing other tenants by fairly scheduling queries between tenants. + +The query frontend job accepts gRPC streaming requests from the queriers, which then "pull" requests from the frontend. For HA it is recommended you run multiple frontends - the queriers will connect to (and pull requests from) all of them. To get the benefit of the fair scheduling, it is recommended you run fewer frontends than queriers - two should suffice. + +See the document "[Cortex Query Woes](https://docs.google.com/document/d/1lsvSkv0tiAMPQv-V8vI2LZ8f4i9JuTRsuPI_i-XcAqY)" for more details design discussion. In the future, query splitting, query alignment and query results caching will be added to the frontend. + +The query frontend is completely optional - you can continue to use the queriers directly. If you want to use the query frontend, direct incoming authenticated traffic at them and set the `-querier.frontend-address` flag on the queriers. + +### Queriers + +The queriers handled the actual PromQL evaluation. They embed the chunk store client code for fetching data from long-term storage, and communicate with the ingesters for more recent data. diff --git a/pkg/querier/config.go b/pkg/querier/config.go index f19f784a973..a72c8f0d17b 100644 --- a/pkg/querier/config.go +++ b/pkg/querier/config.go @@ -18,7 +18,7 @@ type Config struct { Iterators bool } -// RegisterFlags adds the flags required to config this to the given FlagSet +// RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.") f.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.") diff --git a/pkg/querier/frontend/frontend.go b/pkg/querier/frontend/frontend.go new file mode 100644 index 00000000000..c7332e9d83c --- /dev/null +++ b/pkg/querier/frontend/frontend.go @@ -0,0 +1,243 @@ +package frontend + +import ( + "context" + "flag" + "math/rand" + "net/http" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/httpgrpc/server" + "github.com/weaveworks/common/user" +) + +var ( + queueDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "query_frontend_queue_duration_seconds", + Help: "Time spend by requests queued.", + Buckets: prometheus.DefBuckets, + }) + retries = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "query_frontend_retries", + Help: "Number of times a request is retried.", + Buckets: []float64{0, 1, 2, 3, 4, 5}, + }) + queueLength = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "query_frontend_queue_length", + Help: "Number of queries in the queue.", + }) + + errServerClosing = httpgrpc.Errorf(http.StatusTeapot, "server closing down") + errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests") + errCanceled = httpgrpc.Errorf(http.StatusInternalServerError, "context cancelled") +) + +// Config for a Frontend. +type Config struct { + MaxOutstandingPerTenant int + MaxRetries int +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.") + f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyon this, the downstream error is returned.") +} + +// Frontend queues HTTP requests, dispatches them to backends, and handles retries +// for requests which failed. +type Frontend struct { + cfg Config + log log.Logger + + mtx sync.Mutex + cond *sync.Cond + queues map[string]chan *request +} + +type request struct { + enqueueTime time.Time + request *httpgrpc.HTTPRequest + err chan error + response chan *httpgrpc.HTTPResponse +} + +// New creates a new frontend. +func New(cfg Config, log log.Logger) (*Frontend, error) { + f := &Frontend{ + cfg: cfg, + log: log, + queues: map[string]chan *request{}, + } + f.cond = sync.NewCond(&f.mtx) + return f, nil +} + +// Close stops new requests and errors out any pending requests. +func (f *Frontend) Close() { + f.mtx.Lock() + defer f.mtx.Unlock() + for len(f.queues) > 0 { + f.cond.Wait() + } +} + +// ServeHTTP serves HTTP requests. +func (f *Frontend) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if err := f.serveHTTP(w, r); err != nil { + server.WriteError(w, err) + } +} + +func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error { + ctx := r.Context() + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return err + } + + req, err := server.HTTPRequest(r) + if err != nil { + return err + } + + request := &request{ + request: req, + // Buffer of 1 to ensure response can be written even if client has gone away. + err: make(chan error, 1), + response: make(chan *httpgrpc.HTTPResponse, 1), + } + + var lastErr error + for tries := 0; tries < f.cfg.MaxRetries; tries++ { + if err := f.queueRequest(userID, request); err != nil { + return err + } + + var resp *httpgrpc.HTTPResponse + select { + case <-ctx.Done(): + // TODO propagate cancellation. + //request.Cancel() + return errCanceled + + case resp = <-request.response: + case lastErr = <-request.err: + resp, _ = httpgrpc.HTTPResponseFromError(lastErr) + } + + // Only retry is we get a HTTP 500 or non-HTTP error. + if resp == nil || resp.Code/100 == 5 { + level.Error(f.log).Log("msg", "error processing request", "try", tries, "err", lastErr, "resp", resp) + continue + } + + retries.Observe(float64(tries)) + server.WriteResponse(w, resp) + return nil + } + + return lastErr +} + +// Process allows backends to pull requests from the frontend. +func (f *Frontend) Process(server Frontend_ProcessServer) error { + + // If this request is canceled, ping the condition to unblock. This is done + // once, here (instead of in getNextRequest) as we expect calls to Process to + // process many requests. + go func() { + <-server.Context().Done() + f.cond.Broadcast() + }() + + for { + request, err := f.getNextRequest(server.Context()) + if err != nil { + return err + } + + if err := server.Send(&ProcessRequest{ + HttpRequest: request.request, + }); err != nil { + request.err <- err + return err + } + + response, err := server.Recv() + if err != nil { + request.err <- err + return err + } + + request.response <- response.HttpResponse + } +} + +func (f *Frontend) queueRequest(userID string, req *request) error { + req.enqueueTime = time.Now() + + f.mtx.Lock() + defer f.mtx.Unlock() + + queue, ok := f.queues[userID] + if !ok { + queue = make(chan *request, f.cfg.MaxOutstandingPerTenant) + f.queues[userID] = queue + } + + select { + case queue <- req: + queueLength.Add(1) + f.cond.Broadcast() + return nil + default: + return errTooManyRequest + } +} + +// getQueue picks a random queue and takes the next request off of it, so we +// fairly process users queries. Will block if there are no requests. +func (f *Frontend) getNextRequest(ctx context.Context) (*request, error) { + f.mtx.Lock() + defer f.mtx.Unlock() + + for len(f.queues) == 0 && ctx.Err() == nil { + f.cond.Wait() + } + + if err := ctx.Err(); err != nil { + return nil, err + } + + i, n := 0, rand.Intn(len(f.queues)) + for userID, queue := range f.queues { + if i < n { + i++ + continue + } + + request := <-queue + if len(queue) == 0 { + delete(f.queues, userID) + } + + // Tell close() we've processed a request. + f.cond.Broadcast() + + queueDuration.Observe(time.Now().Sub(request.enqueueTime).Seconds()) + queueLength.Add(-1) + return request, nil + } + + panic("should never happen") +} diff --git a/pkg/querier/frontend/frontend.proto b/pkg/querier/frontend/frontend.proto new file mode 100644 index 00000000000..9b2703ef1a9 --- /dev/null +++ b/pkg/querier/frontend/frontend.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +package frontend; + +option go_package = "frontend"; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +import "github.com/weaveworks/common/httpgrpc/httpgrpc.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; + +service Frontend { + rpc Process(stream ProcessResponse) returns (stream ProcessRequest) {}; +} + +message ProcessRequest { + httpgrpc.HTTPRequest httpRequest = 1; +} + +message ProcessResponse { + httpgrpc.HTTPResponse httpResponse = 1; +} diff --git a/pkg/querier/frontend/frontend_test.go b/pkg/querier/frontend/frontend_test.go new file mode 100644 index 00000000000..01971f79b52 --- /dev/null +++ b/pkg/querier/frontend/frontend_test.go @@ -0,0 +1,110 @@ +package frontend + +import ( + "context" + "fmt" + "io/ioutil" + "net" + "net/http" + "sync/atomic" + "testing" + + "github.com/go-kit/kit/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" + "github.com/weaveworks/common/middleware" + "google.golang.org/grpc" + + "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/util" +) + +func TestFrontend(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("Hello World")) + }) + test := func(addr string) { + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil) + require.NoError(t, err) + err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), "1"), req) + require.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + + assert.Equal(t, "Hello World", string(body)) + } + testFrontend(t, handler, test) +} + +func TestFrontendRetries(t *testing.T) { + try := int32(0) + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if atomic.AddInt32(&try, 1) == 5 { + w.Write([]byte("Hello World")) + return + } + + w.WriteHeader(http.StatusInternalServerError) + }) + test := func(addr string) { + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil) + require.NoError(t, err) + err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), "1"), req) + require.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + + assert.Equal(t, "Hello World", string(body)) + } + testFrontend(t, handler, test) +} + +func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) { + logger := log.NewNopLogger() //log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + + var ( + config Config + workerConfig WorkerConfig + ) + util.DefaultValues(&config, &workerConfig) + + grpcListen, err := net.Listen("tcp", "") + require.NoError(t, err) + workerConfig.Address = grpcListen.Addr().String() + + httpListen, err := net.Listen("tcp", "") + require.NoError(t, err) + + frontend, err := New(config, logger) + require.NoError(t, err) + + grpcServer := grpc.NewServer() + defer grpcServer.GracefulStop() + + RegisterFrontendServer(grpcServer, frontend) + + httpServer := http.Server{ + Handler: middleware.AuthenticateUser.Wrap(frontend), + } + defer httpServer.Shutdown(context.Background()) + + go httpServer.Serve(httpListen) + go grpcServer.Serve(grpcListen) + + worker, err := NewWorker(workerConfig, httpgrpc_server.NewServer(handler), logger) + require.NoError(t, err) + defer worker.Stop() + + test(httpListen.Addr().String()) +} diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go new file mode 100644 index 00000000000..03385aaa1d3 --- /dev/null +++ b/pkg/querier/frontend/worker.go @@ -0,0 +1,224 @@ +package frontend + +import ( + "context" + "flag" + "net/http" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" + "github.com/mwitkow/go-grpc-middleware" + "github.com/opentracing/opentracing-go" + "google.golang.org/grpc" + "google.golang.org/grpc/naming" + + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/httpgrpc/server" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/cortex/pkg/util" +) + +var ( + backoffConfig = util.BackoffConfig{ + MinBackoff: 50 * time.Millisecond, + MaxBackoff: 1 * time.Second, + } +) + +// WorkerConfig is config for a worker. +type WorkerConfig struct { + Address string + Parallelism int + DNSLookupDuration time.Duration +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.Address, "querier.frontend-address", "", "Address of query frontend service.") + f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process.") + f.DurationVar(&cfg.DNSLookupDuration, "querier.dns-lookup-period", 10*time.Second, "How often to query DNS.") +} + +// Worker is the counter-part to the frontend, actually processing requests. +type Worker interface { + Stop() +} + +type worker struct { + cfg WorkerConfig + log log.Logger + server *server.Server + + ctx context.Context + cancel context.CancelFunc + watcher naming.Watcher + wg sync.WaitGroup +} + +type noopWorker struct { +} + +func (noopWorker) Stop() {} + +// NewWorker creates a new Worker. +func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (Worker, error) { + if cfg.Address == "" { + level.Info(log).Log("msg", "no address specified, not starting worker") + return noopWorker{}, nil + } + + resolver, err := naming.NewDNSResolverWithFreq(cfg.DNSLookupDuration) + if err != nil { + return nil, err + } + + watcher, err := resolver.Resolve(cfg.Address) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + + w := &worker{ + cfg: cfg, + log: log, + server: server, + + ctx: ctx, + cancel: cancel, + watcher: watcher, + } + w.wg.Add(1) + go w.watchDNSLoop() + return w, nil +} + +// Stop the worker. +func (w *worker) Stop() { + w.watcher.Close() + w.cancel() + w.wg.Wait() +} + +// watchDNSLoop watches for changes in DNS and starts or stops workers. +func (w *worker) watchDNSLoop() { + defer w.wg.Done() + + cancels := map[string]context.CancelFunc{} + defer func() { + for _, cancel := range cancels { + cancel() + } + }() + + for { + updates, err := w.watcher.Next() + if err != nil { + level.Error(w.log).Log("msg", "error from DNS watcher", "err", err) + return + } + + for _, update := range updates { + switch update.Op { + case naming.Add: + level.Debug(w.log).Log("msg", "adding connection", "addr", update.Addr) + ctx, cancel := context.WithCancel(w.ctx) + cancels[update.Addr] = cancel + w.runMany(ctx, update.Addr) + + case naming.Delete: + level.Debug(w.log).Log("msg", "removing connection", "addr", update.Addr) + if cancel, ok := cancels[update.Addr]; ok { + cancel() + } + + default: + panic("unknown op") + } + } + } +} + +// runMany starts N runOne loops for a given address. +func (w *worker) runMany(ctx context.Context, address string) { + client, err := connect(address) + if err != nil { + level.Error(w.log).Log("msg", "error connecting", "addr", address, "err", err) + return + } + + w.wg.Add(w.cfg.Parallelism) + for i := 0; i < w.cfg.Parallelism; i++ { + go w.runOne(ctx, client) + } +} + +// runOne loops, trying to establish a stream to the frontend to begin +// request processing. +func (w *worker) runOne(ctx context.Context, client FrontendClient) { + defer w.wg.Done() + + backoff := util.NewBackoff(ctx, backoffConfig) + for backoff.Ongoing() { + c, err := client.Process(ctx) + if err != nil { + level.Error(w.log).Log("msg", "error contacting frontend", "err", err) + backoff.Wait() + continue + } + + if err := w.process(ctx, c); err != nil { + level.Error(w.log).Log("msg", "error processing requests", "err", err) + backoff.Wait() + continue + } + + backoff.Reset() + } +} + +// process loops processing requests on an established stream. +func (w *worker) process(ctx context.Context, c Frontend_ProcessClient) error { + for { + request, err := c.Recv() + if err != nil { + return err + } + + response, err := w.server.Handle(ctx, request.HttpRequest) + if err != nil { + var ok bool + response, ok = httpgrpc.HTTPResponseFromError(err) + if !ok { + response = &httpgrpc.HTTPResponse{ + Code: http.StatusInternalServerError, + Body: []byte(err.Error()), + } + } + } + + if err := c.Send(&ProcessResponse{ + HttpResponse: response, + }); err != nil { + return err + } + } +} + +func connect(address string) (FrontendClient, error) { + conn, err := grpc.Dial( + address, + grpc.WithInsecure(), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.ClientUserHeaderInterceptor, + )), + ) + if err != nil { + return nil, err + } + return NewFrontendClient(conn), nil +} diff --git a/vendor/github.com/weaveworks/common/httpgrpc/httpgrpc.pb.go b/vendor/github.com/weaveworks/common/httpgrpc/httpgrpc.pb.go index a617fc85781..52c2cea0d32 100644 --- a/vendor/github.com/weaveworks/common/httpgrpc/httpgrpc.pb.go +++ b/vendor/github.com/weaveworks/common/httpgrpc/httpgrpc.pb.go @@ -1,28 +1,32 @@ -// Code generated by protoc-gen-go. +// Code generated by protoc-gen-gogo. DO NOT EDIT. // source: github.com/weaveworks/common/httpgrpc/httpgrpc.proto -// DO NOT EDIT! /* -Package httpgrpc is a generated protocol buffer package. + Package httpgrpc is a generated protocol buffer package. -It is generated from these files: - github.com/weaveworks/common/httpgrpc/httpgrpc.proto + It is generated from these files: + github.com/weaveworks/common/httpgrpc/httpgrpc.proto -It has these top-level messages: - HTTPRequest - HTTPResponse - Header + It has these top-level messages: + HTTPRequest + HTTPResponse + Header */ package httpgrpc -import proto "github.com/golang/protobuf/proto" +import proto "github.com/gogo/protobuf/proto" import fmt "fmt" import math "math" -import ( - context "golang.org/x/net/context" - grpc "google.golang.org/grpc" -) +import bytes "bytes" + +import strings "strings" +import reflect "reflect" + +import context "golang.org/x/net/context" +import grpc "google.golang.org/grpc" + +import io "io" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal @@ -33,19 +37,18 @@ var _ = math.Inf // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package type HTTPRequest struct { - Method string `protobuf:"bytes,1,opt,name=method" json:"method,omitempty"` - Url string `protobuf:"bytes,2,opt,name=url" json:"url,omitempty"` + Method string `protobuf:"bytes,1,opt,name=method,proto3" json:"method,omitempty"` + Url string `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"` Headers []*Header `protobuf:"bytes,3,rep,name=headers" json:"headers,omitempty"` Body []byte `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"` } func (m *HTTPRequest) Reset() { *m = HTTPRequest{} } -func (m *HTTPRequest) String() string { return proto.CompactTextString(m) } func (*HTTPRequest) ProtoMessage() {} -func (*HTTPRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (*HTTPRequest) Descriptor() ([]byte, []int) { return fileDescriptorHttpgrpc, []int{0} } func (m *HTTPRequest) GetMethod() string { if m != nil { @@ -76,15 +79,14 @@ func (m *HTTPRequest) GetBody() []byte { } type HTTPResponse struct { - Code int32 `protobuf:"varint,1,opt,name=Code" json:"Code,omitempty"` + Code int32 `protobuf:"varint,1,opt,name=Code,json=code,proto3" json:"Code,omitempty"` Headers []*Header `protobuf:"bytes,2,rep,name=headers" json:"headers,omitempty"` Body []byte `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"` } func (m *HTTPResponse) Reset() { *m = HTTPResponse{} } -func (m *HTTPResponse) String() string { return proto.CompactTextString(m) } func (*HTTPResponse) ProtoMessage() {} -func (*HTTPResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +func (*HTTPResponse) Descriptor() ([]byte, []int) { return fileDescriptorHttpgrpc, []int{1} } func (m *HTTPResponse) GetCode() int32 { if m != nil { @@ -108,14 +110,13 @@ func (m *HTTPResponse) GetBody() []byte { } type Header struct { - Key string `protobuf:"bytes,1,opt,name=key" json:"key,omitempty"` + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Values []string `protobuf:"bytes,2,rep,name=values" json:"values,omitempty"` } func (m *Header) Reset() { *m = Header{} } -func (m *Header) String() string { return proto.CompactTextString(m) } func (*Header) ProtoMessage() {} -func (*Header) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } +func (*Header) Descriptor() ([]byte, []int) { return fileDescriptorHttpgrpc, []int{2} } func (m *Header) GetKey() string { if m != nil { @@ -136,6 +137,159 @@ func init() { proto.RegisterType((*HTTPResponse)(nil), "httpgrpc.HTTPResponse") proto.RegisterType((*Header)(nil), "httpgrpc.Header") } +func (this *HTTPRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*HTTPRequest) + if !ok { + that2, ok := that.(HTTPRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Method != that1.Method { + return false + } + if this.Url != that1.Url { + return false + } + if len(this.Headers) != len(that1.Headers) { + return false + } + for i := range this.Headers { + if !this.Headers[i].Equal(that1.Headers[i]) { + return false + } + } + if !bytes.Equal(this.Body, that1.Body) { + return false + } + return true +} +func (this *HTTPResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*HTTPResponse) + if !ok { + that2, ok := that.(HTTPResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Code != that1.Code { + return false + } + if len(this.Headers) != len(that1.Headers) { + return false + } + for i := range this.Headers { + if !this.Headers[i].Equal(that1.Headers[i]) { + return false + } + } + if !bytes.Equal(this.Body, that1.Body) { + return false + } + return true +} +func (this *Header) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Header) + if !ok { + that2, ok := that.(Header) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Key != that1.Key { + return false + } + if len(this.Values) != len(that1.Values) { + return false + } + for i := range this.Values { + if this.Values[i] != that1.Values[i] { + return false + } + } + return true +} +func (this *HTTPRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 8) + s = append(s, "&httpgrpc.HTTPRequest{") + s = append(s, "Method: "+fmt.Sprintf("%#v", this.Method)+",\n") + s = append(s, "Url: "+fmt.Sprintf("%#v", this.Url)+",\n") + if this.Headers != nil { + s = append(s, "Headers: "+fmt.Sprintf("%#v", this.Headers)+",\n") + } + s = append(s, "Body: "+fmt.Sprintf("%#v", this.Body)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *HTTPResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&httpgrpc.HTTPResponse{") + s = append(s, "Code: "+fmt.Sprintf("%#v", this.Code)+",\n") + if this.Headers != nil { + s = append(s, "Headers: "+fmt.Sprintf("%#v", this.Headers)+",\n") + } + s = append(s, "Body: "+fmt.Sprintf("%#v", this.Body)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *Header) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&httpgrpc.Header{") + s = append(s, "Key: "+fmt.Sprintf("%#v", this.Key)+",\n") + s = append(s, "Values: "+fmt.Sprintf("%#v", this.Values)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringHttpgrpc(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} // Reference imports to suppress errors if they are not otherwise used. var _ context.Context @@ -209,27 +363,797 @@ var _HTTP_serviceDesc = grpc.ServiceDesc{ Metadata: "github.com/weaveworks/common/httpgrpc/httpgrpc.proto", } +func (m *HTTPRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *HTTPRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Method) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Method))) + i += copy(dAtA[i:], m.Method) + } + if len(m.Url) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Url))) + i += copy(dAtA[i:], m.Url) + } + if len(m.Headers) > 0 { + for _, msg := range m.Headers { + dAtA[i] = 0x1a + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.Body) > 0 { + dAtA[i] = 0x22 + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Body))) + i += copy(dAtA[i:], m.Body) + } + return i, nil +} + +func (m *HTTPResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *HTTPResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Code != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(m.Code)) + } + if len(m.Headers) > 0 { + for _, msg := range m.Headers { + dAtA[i] = 0x12 + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.Body) > 0 { + dAtA[i] = 0x1a + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Body))) + i += copy(dAtA[i:], m.Body) + } + return i, nil +} + +func (m *Header) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Header) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Key) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Key))) + i += copy(dAtA[i:], m.Key) + } + if len(m.Values) > 0 { + for _, s := range m.Values { + dAtA[i] = 0x12 + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } + return i, nil +} + +func encodeVarintHttpgrpc(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *HTTPRequest) Size() (n int) { + var l int + _ = l + l = len(m.Method) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + l = len(m.Url) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + if len(m.Headers) > 0 { + for _, e := range m.Headers { + l = e.Size() + n += 1 + l + sovHttpgrpc(uint64(l)) + } + } + l = len(m.Body) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + return n +} + +func (m *HTTPResponse) Size() (n int) { + var l int + _ = l + if m.Code != 0 { + n += 1 + sovHttpgrpc(uint64(m.Code)) + } + if len(m.Headers) > 0 { + for _, e := range m.Headers { + l = e.Size() + n += 1 + l + sovHttpgrpc(uint64(l)) + } + } + l = len(m.Body) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + return n +} + +func (m *Header) Size() (n int) { + var l int + _ = l + l = len(m.Key) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + if len(m.Values) > 0 { + for _, s := range m.Values { + l = len(s) + n += 1 + l + sovHttpgrpc(uint64(l)) + } + } + return n +} + +func sovHttpgrpc(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozHttpgrpc(x uint64) (n int) { + return sovHttpgrpc(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *HTTPRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&HTTPRequest{`, + `Method:` + fmt.Sprintf("%v", this.Method) + `,`, + `Url:` + fmt.Sprintf("%v", this.Url) + `,`, + `Headers:` + strings.Replace(fmt.Sprintf("%v", this.Headers), "Header", "Header", 1) + `,`, + `Body:` + fmt.Sprintf("%v", this.Body) + `,`, + `}`, + }, "") + return s +} +func (this *HTTPResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&HTTPResponse{`, + `Code:` + fmt.Sprintf("%v", this.Code) + `,`, + `Headers:` + strings.Replace(fmt.Sprintf("%v", this.Headers), "Header", "Header", 1) + `,`, + `Body:` + fmt.Sprintf("%v", this.Body) + `,`, + `}`, + }, "") + return s +} +func (this *Header) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Header{`, + `Key:` + fmt.Sprintf("%v", this.Key) + `,`, + `Values:` + fmt.Sprintf("%v", this.Values) + `,`, + `}`, + }, "") + return s +} +func valueToStringHttpgrpc(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *HTTPRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: HTTPRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: HTTPRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Method", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Method = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Url", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Url = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Headers", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Headers = append(m.Headers, &Header{}) + if err := m.Headers[len(m.Headers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Body", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Body = append(m.Body[:0], dAtA[iNdEx:postIndex]...) + if m.Body == nil { + m.Body = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipHttpgrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthHttpgrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *HTTPResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: HTTPResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: HTTPResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Code", wireType) + } + m.Code = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Code |= (int32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Headers", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Headers = append(m.Headers, &Header{}) + if err := m.Headers[len(m.Headers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Body", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Body = append(m.Body[:0], dAtA[iNdEx:postIndex]...) + if m.Body == nil { + m.Body = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipHttpgrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthHttpgrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Header) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Header: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Header: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Values", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Values = append(m.Values, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipHttpgrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthHttpgrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipHttpgrpc(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthHttpgrpc + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipHttpgrpc(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthHttpgrpc = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowHttpgrpc = fmt.Errorf("proto: integer overflow") +) + func init() { - proto.RegisterFile("github.com/weaveworks/common/httpgrpc/httpgrpc.proto", fileDescriptor0) -} - -var fileDescriptor0 = []byte{ - // 264 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x91, 0x4f, 0x4f, 0x83, 0x40, - 0x10, 0xc5, 0x6d, 0x41, 0xb4, 0xd3, 0x1e, 0x9a, 0x4d, 0x6c, 0x88, 0x27, 0x42, 0x62, 0x24, 0x1e, - 0x20, 0x41, 0x2f, 0x1e, 0xd5, 0x0b, 0x47, 0xb3, 0xe9, 0xc9, 0x1b, 0x7f, 0x26, 0xc5, 0x14, 0x18, - 0xdc, 0x5d, 0x4a, 0xfa, 0xed, 0xcd, 0x2e, 0xb4, 0x12, 0x4f, 0xbd, 0xbd, 0x37, 0xbc, 0xf0, 0x9b, - 0xb7, 0x03, 0x2f, 0xbb, 0x6f, 0x55, 0x76, 0x59, 0x98, 0x53, 0x1d, 0xf5, 0x98, 0x1e, 0xb0, 0x27, - 0xb1, 0x97, 0x51, 0x4e, 0x75, 0x4d, 0x4d, 0x54, 0x2a, 0xd5, 0xee, 0x44, 0x9b, 0x9f, 0x45, 0xd8, - 0x0a, 0x52, 0xc4, 0x6e, 0x4f, 0xde, 0xef, 0x61, 0x99, 0x6c, 0xb7, 0x9f, 0x1c, 0x7f, 0x3a, 0x94, - 0x8a, 0x6d, 0xc0, 0xa9, 0x51, 0x95, 0x54, 0xb8, 0x33, 0x6f, 0x16, 0x2c, 0xf8, 0xe8, 0xd8, 0x1a, - 0xac, 0x4e, 0x54, 0xee, 0xdc, 0x0c, 0xb5, 0x64, 0x4f, 0x70, 0x53, 0x62, 0x5a, 0xa0, 0x90, 0xae, - 0xe5, 0x59, 0xc1, 0x32, 0x5e, 0x87, 0x67, 0x48, 0x62, 0x3e, 0xf0, 0x53, 0x80, 0x31, 0xb0, 0x33, - 0x2a, 0x8e, 0xae, 0xed, 0xcd, 0x82, 0x15, 0x37, 0xda, 0xcf, 0x60, 0x35, 0x80, 0x65, 0x4b, 0x8d, - 0x44, 0x9d, 0xf9, 0xa0, 0x02, 0x0d, 0xf7, 0x9a, 0x1b, 0x3d, 0x65, 0xcc, 0x2f, 0x65, 0x58, 0x13, - 0x46, 0x0c, 0xce, 0x10, 0xd3, 0xfb, 0xef, 0xf1, 0x38, 0x96, 0xd2, 0x52, 0x37, 0x3d, 0xa4, 0x55, - 0x87, 0xc3, 0xaf, 0x17, 0x7c, 0x74, 0xf1, 0x1b, 0xd8, 0x7a, 0x2f, 0xf6, 0x0a, 0x4e, 0x92, 0x36, - 0x45, 0x85, 0xec, 0x6e, 0x02, 0xfd, 0x7b, 0xaa, 0xfb, 0xcd, 0xff, 0xf1, 0x50, 0xc4, 0xbf, 0x7a, - 0x7f, 0xfc, 0x7a, 0xb8, 0xe8, 0x2a, 0x99, 0x63, 0xae, 0xf1, 0xfc, 0x1b, 0x00, 0x00, 0xff, 0xff, - 0x1c, 0x0f, 0x07, 0xc6, 0xc5, 0x01, 0x00, 0x00, + proto.RegisterFile("github.com/weaveworks/common/httpgrpc/httpgrpc.proto", fileDescriptorHttpgrpc) +} + +var fileDescriptorHttpgrpc = []byte{ + // 319 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x49, 0xcf, 0x2c, 0xc9, + 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x2f, 0x4f, 0x4d, 0x2c, 0x4b, 0x2d, 0xcf, 0x2f, 0xca, + 0x2e, 0xd6, 0x4f, 0xce, 0xcf, 0xcd, 0xcd, 0xcf, 0xd3, 0xcf, 0x28, 0x29, 0x29, 0x48, 0x2f, 0x2a, + 0x48, 0x86, 0x33, 0xf4, 0x0a, 0x8a, 0xf2, 0x4b, 0xf2, 0x85, 0x38, 0x60, 0x7c, 0xa5, 0x72, 0x2e, + 0x6e, 0x8f, 0x90, 0x90, 0x80, 0xa0, 0xd4, 0xc2, 0xd2, 0xd4, 0xe2, 0x12, 0x21, 0x31, 0x2e, 0xb6, + 0xdc, 0xd4, 0x92, 0x8c, 0xfc, 0x14, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xce, 0x20, 0x28, 0x4f, 0x48, + 0x80, 0x8b, 0xb9, 0xb4, 0x28, 0x47, 0x82, 0x09, 0x2c, 0x08, 0x62, 0x0a, 0x69, 0x71, 0xb1, 0x67, + 0xa4, 0x26, 0xa6, 0xa4, 0x16, 0x15, 0x4b, 0x30, 0x2b, 0x30, 0x6b, 0x70, 0x1b, 0x09, 0xe8, 0xc1, + 0x2d, 0xf1, 0x00, 0x4b, 0x04, 0xc1, 0x14, 0x08, 0x09, 0x71, 0xb1, 0x24, 0xe5, 0xa7, 0x54, 0x4a, + 0xb0, 0x28, 0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x4a, 0x49, 0x5c, 0x3c, 0x10, 0x8b, 0x8b, 0x0b, + 0xf2, 0xf3, 0x8a, 0x53, 0x41, 0x6a, 0x9c, 0xf3, 0x53, 0x52, 0xc1, 0xf6, 0xb2, 0x06, 0xb1, 0x24, + 0xe7, 0xa7, 0xa4, 0x22, 0xdb, 0xc1, 0x44, 0xac, 0x1d, 0xcc, 0x48, 0x76, 0x18, 0x71, 0xb1, 0x41, + 0x94, 0x81, 0xdc, 0x9f, 0x9d, 0x5a, 0x09, 0xf5, 0x14, 0x88, 0x09, 0xf2, 0x69, 0x59, 0x62, 0x4e, + 0x69, 0x2a, 0xc4, 0x68, 0xce, 0x20, 0x28, 0xcf, 0xc8, 0x91, 0x8b, 0x05, 0xe4, 0x2e, 0x21, 0x4b, + 0x2e, 0x36, 0x8f, 0xc4, 0xbc, 0x94, 0x9c, 0x54, 0x21, 0x51, 0x24, 0x4b, 0x11, 0x41, 0x25, 0x25, + 0x86, 0x2e, 0x0c, 0xf1, 0x88, 0x12, 0x83, 0x53, 0xf0, 0x85, 0x87, 0x72, 0x0c, 0x37, 0x1e, 0xca, + 0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, 0x48, 0x8e, 0x71, 0xc5, 0x23, 0x39, 0xc6, 0x13, 0x8f, + 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0xf1, 0xc5, 0x23, 0x39, 0x86, 0x0f, + 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0x88, 0x52, 0x25, 0x2a, 0x06, 0x93, 0xd8, 0xc0, 0x31, + 0x67, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x49, 0xec, 0x86, 0xd6, 0xf1, 0x01, 0x00, 0x00, } diff --git a/vendor/github.com/weaveworks/common/httpgrpc/server/server.go b/vendor/github.com/weaveworks/common/httpgrpc/server/server.go index 7acebb8b935..1fdb607e40d 100644 --- a/vendor/github.com/weaveworks/common/httpgrpc/server/server.go +++ b/vendor/github.com/weaveworks/common/httpgrpc/server/server.go @@ -66,7 +66,7 @@ func (s Server) Handle(ctx context.Context, r *httpgrpc.HTTPRequest) (*httpgrpc. if recorder.Code/100 == 5 { return nil, httpgrpc.ErrorFromHTTPResponse(resp) } - return resp, err + return resp, nil } // Client is a http.Handler that forwards the request over gRPC. @@ -143,13 +143,37 @@ func NewClient(address string) (*Client, error) { }, nil } -// ServeHTTP implements http.Handler -func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func HTTPRequest(r *http.Request) (*httpgrpc.HTTPRequest, error) { body, err := ioutil.ReadAll(r.Body) if err != nil { + return nil, err + } + return &httpgrpc.HTTPRequest{ + Method: r.Method, + Url: r.RequestURI, + Body: body, + Headers: fromHeader(r.Header), + }, nil +} + +func WriteResponse(w http.ResponseWriter, resp *httpgrpc.HTTPResponse) error { + toHeader(resp.Headers, w.Header()) + w.WriteHeader(int(resp.Code)) + _, err := w.Write(resp.Body) + return err +} + +func WriteError(w http.ResponseWriter, err error) { + resp, ok := httpgrpc.HTTPResponseFromError(err) + if ok { + WriteResponse(w, resp) + } else { http.Error(w, err.Error(), http.StatusInternalServerError) - return } +} + +// ServeHTTP implements http.Handler +func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { if tracer := opentracing.GlobalTracer(); tracer != nil { if span := opentracing.SpanFromContext(r.Context()); span != nil { if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header)); err != nil { @@ -157,13 +181,12 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } } - req := &httpgrpc.HTTPRequest{ - Method: r.Method, - Url: r.RequestURI, - Body: body, - Headers: fromHeader(r.Header), - } + req, err := HTTPRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } resp, err := c.client.Handle(r.Context(), req) if err != nil { // Some errors will actually contain a valid resp, just need to unpack it @@ -176,9 +199,7 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - toHeader(resp.Headers, w.Header()) - w.WriteHeader(int(resp.Code)) - if _, err := w.Write(resp.Body); err != nil { + if err := WriteResponse(w, resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return }