From 837cce5b34310fd5aa858d1e756297098d5e541f Mon Sep 17 00:00:00 2001 From: Tanzeeb Khalili Date: Wed, 11 Jul 2018 18:51:21 -0400 Subject: [PATCH 01/13] Add unit tests for activator Refactored retryRoundTripper and activationHandler into multiple components to make testing simpler. Fixes #1231 --- cmd/activator/handlers.go | 130 +++++++++++++++++ cmd/activator/handlers_test.go | 205 +++++++++++++++++++++++++++ cmd/activator/main.go | 147 +++---------------- cmd/activator/round_trippers.go | 103 ++++++++++++++ cmd/activator/round_trippers_test.go | 152 ++++++++++++++++++++ 5 files changed, 612 insertions(+), 125 deletions(-) create mode 100644 cmd/activator/handlers.go create mode 100644 cmd/activator/handlers_test.go create mode 100644 cmd/activator/round_trippers.go create mode 100644 cmd/activator/round_trippers_test.go diff --git a/cmd/activator/handlers.go b/cmd/activator/handlers.go new file mode 100644 index 000000000000..2d5dba94604f --- /dev/null +++ b/cmd/activator/handlers.go @@ -0,0 +1,130 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httputil" + "net/url" + + "github.com/knative/serving/pkg/activator" + "github.com/knative/serving/pkg/controller" + "go.uber.org/zap" +) + +// activationHandler will proxy a request to the active endpoint for the specified revision, +// using the provided transport +type activationHandler struct { + activator activator.Activator + logger *zap.SugaredLogger + transport http.RoundTripper +} + +func newActivationHandler(a activator.Activator, rt http.RoundTripper, l *zap.SugaredLogger) http.Handler { + return &activationHandler{activator: a, transport: rt, logger: l} +} + +func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + namespace := r.Header.Get(controller.GetRevisionHeaderNamespace()) + name := r.Header.Get(controller.GetRevisionHeaderName()) + + endpoint, status, err := a.activator.ActiveEndpoint(namespace, name) + if err != nil { + msg := fmt.Sprintf("Error getting active endpoint: %v", err) + + a.logger.Errorf(msg) + http.Error(w, msg, int(status)) + + return + } + + target := &url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%d", endpoint.FQDN, endpoint.Port), + } + + proxy := httputil.NewSingleHostReverseProxy(target) + proxy.Transport = a.transport + + // TODO: Clear the host to avoid 404's. + // https://github.com/knative/serving/issues/964 + r.Host = "" + + proxy.ServeHTTP(w, r) +} + +// uploadHandler wraps the provided handler with a request body that supports +// re-reading and prevents uploads larger than `maxUploadBytes` +type uploadHandler struct { + http.Handler + MaxUploadBytes int64 +} + +func newUploadHandler(h http.Handler, maxUploadBytes int64) http.Handler { + return uploadHandler{ + Handler: h, + MaxUploadBytes: maxUploadBytes, + } +} + +func (h uploadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.ContentLength > h.MaxUploadBytes { + w.WriteHeader(http.StatusRequestEntityTooLarge) + return + } + + // The request body cannot be read multiple times for retries. + // The workaround is to clone the request body into a byte reader + // so the body can be read multiple times. + r.Body = newRewinder(r.Body) + + h.Handler.ServeHTTP(w, r) +} + +// rewinder wraps a single-use `ReadCloser` into a `ReadCloser` that can be read multiple times +type rewinder struct { + rc io.ReadCloser + rs io.ReadSeeker +} + +func newRewinder(rc io.ReadCloser) io.ReadCloser { + return &rewinder{rc: rc} +} + +func (r *rewinder) Read(b []byte) (int, error) { + // On the first `Read()`, the contents of `rc` is read into a buffer `rs`. + // This buffer is used for all subsequent reads + if r.rs == nil { + buf, err := ioutil.ReadAll(r.rc) + if err != nil { + return 0, err + } + r.rc.Close() + + r.rs = bytes.NewReader(buf) + } + + return r.rs.Read(b) +} + +func (r *rewinder) Close() error { + // Rewind the buffer on `Close()` for the next call to `Read` + r.rs.Seek(0, io.SeekStart) + + return nil +} diff --git a/cmd/activator/handlers_test.go b/cmd/activator/handlers_test.go new file mode 100644 index 000000000000..2f59925d1395 --- /dev/null +++ b/cmd/activator/handlers_test.go @@ -0,0 +1,205 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package main + +import ( + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + + "github.com/knative/serving/pkg/activator" + "github.com/knative/serving/pkg/controller" + "go.uber.org/zap" +) + +type fakeActivator struct { + endpoint activator.Endpoint + namespace string + name string +} + +func newFakeActivator(namespace string, name string, server *httptest.Server) fakeActivator { + url, _ := url.Parse(server.URL) + host := url.Hostname() + port, _ := strconv.Atoi(url.Port()) + + return fakeActivator{ + endpoint: activator.Endpoint{FQDN: host, Port: int32(port)}, + namespace: namespace, + name: name, + } +} + +func (fa fakeActivator) ActiveEndpoint(namespace, name string) (activator.Endpoint, activator.Status, error) { + if namespace == fa.namespace && name == fa.name { + return fa.endpoint, http.StatusOK, nil + } + + return activator.Endpoint{}, http.StatusNotFound, errors.New("not found!") +} + +func (fa fakeActivator) Shutdown() { +} + +func TestActivationHandler(t *testing.T) { + logger := zap.NewExample().Sugar() + + errMsg := func(msg string) string { + return fmt.Sprintf("Error getting active endpoint: %v\n", msg) + } + + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.WriteString(w, "everything good!") + }), + ) + defer server.Close() + + act := newFakeActivator("real-namespace", "real-name", server) + + examples := []struct { + label string + namespace string + name string + wantBody string + wantCode int + wantErr error + }{ + {"active endpoint", "real-namespace", "real-name", "everything good!", http.StatusOK, nil}, + {"no active endpoint", "fake-namespace", "fake-name", errMsg("not found!"), http.StatusNotFound, nil}, + {"request error", "real-namespace", "real-name", "", http.StatusBadGateway, errors.New("request error!")}, + } + + for _, e := range examples { + t.Run(e.label, func(t *testing.T) { + rt := roundTripperFunc(func(r *http.Request) (*http.Response, error) { + if r.Host != "" { + t.Errorf("Unexpected request host. Want %q, got %q", "", r.Host) + } + + if e.wantErr != nil { + return nil, e.wantErr + } + + return http.DefaultTransport.RoundTrip(r) + }) + + handler := newActivationHandler(act, rt, logger) + + resp := httptest.NewRecorder() + + req := httptest.NewRequest("POST", "http://example.com", nil) + req.Header.Set(controller.GetRevisionHeaderNamespace(), e.namespace) + req.Header.Set(controller.GetRevisionHeaderName(), e.name) + + handler.ServeHTTP(resp, req) + + if resp.Code != e.wantCode { + t.Errorf("Unexpected response status. Want %d, got %d", e.wantCode, resp.Code) + } + + gotBody, _ := ioutil.ReadAll(resp.Body) + if string(gotBody) != e.wantBody { + t.Errorf("Unexpected response body. Want %q, got %q", e.wantBody, gotBody) + } + }) + } +} + +func TestUploadHandler(t *testing.T) { + payload := "SAMPLE PAYLOAD" + + examples := []struct { + label string + maxUpload int + status int + }{ + {"under", len(payload) + 1, http.StatusOK}, + {"equal", len(payload), http.StatusOK}, + {"over", len(payload) - 1, http.StatusRequestEntityTooLarge}, + } + + for _, e := range examples { + t.Run(e.label, func(t *testing.T) { + baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b1, _ := ioutil.ReadAll(r.Body) + r.Body.Close() + + b2, _ := ioutil.ReadAll(r.Body) + r.Body.Close() + + if string(b1) != payload || string(b2) != payload { + t.Errorf("Expected request body to be rereadable. Want %q, got %q and %q.", payload, b1, b2) + } + }) + handler := newUploadHandler(baseHandler, int64(e.maxUpload)) + + resp := httptest.NewRecorder() + req := httptest.NewRequest("POST", "http://example.com", bytes.NewBufferString(payload)) + + handler.ServeHTTP(resp, req) + + if resp.Code != e.status { + t.Errorf("Unexpected response status for payload %q. Want %d, got %d", payload, e.status, resp.Code) + } + }) + } +} + +type readCloser struct { + io.Reader + closed bool +} + +func (rc *readCloser) Close() error { + rc.closed = true + + return nil +} + +func TestRewinder(t *testing.T) { + str := "test string" + rc := &readCloser{bytes.NewBufferString(str), false} + rewinder := newRewinder(rc) + + b1, err := ioutil.ReadAll(rewinder) + if err != nil { + t.Errorf("Unexpected error reading b1: %v", err) + } + rewinder.Close() + + b2, err := ioutil.ReadAll(rewinder) + if err != nil { + t.Errorf("Unexpected error reading b2: %v", err) + } + rewinder.Close() + + if string(b1) != str { + t.Errorf("Unexpected str b1. Want %q, got %q", str, b1) + } + + if string(b2) != str { + t.Errorf("Unexpected str b2. Want %q, got %q", str, b2) + } + + if !rc.closed { + t.Errorf("Expected ReadCloser to be closed") + } +} diff --git a/cmd/activator/main.go b/cmd/activator/main.go index b8b07187a1e1..bab04af4ae8a 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -17,15 +17,9 @@ limitations under the License. package main import ( - "bytes" "flag" - "fmt" - "io" - "io/ioutil" "log" "net/http" - "net/http/httputil" - "net/url" "time" "github.com/knative/pkg/logging/logkey" @@ -34,9 +28,9 @@ import ( "github.com/knative/pkg/signals" "github.com/knative/serving/pkg/activator" clientset "github.com/knative/serving/pkg/client/clientset/versioned" - h2cutil "github.com/knative/serving/pkg/h2c" + "github.com/knative/serving/pkg/configmap" + "github.com/knative/serving/pkg/logging" - "github.com/knative/serving/pkg/reconciler" "github.com/knative/serving/pkg/system" "github.com/knative/serving/third_party/h2c" "go.opencensus.io/exporter/prometheus" @@ -47,124 +41,15 @@ import ( ) const ( - maxUploadBytes = 32e6 // 32MB - same as app engine - maxRetry = 60 - retryInterval = 1 * time.Second - logLevelKey = "activator" + maxUploadBytes = 32e6 // 32MB - same as app engine + maxRetry = 60 + retryInterval = 1 * time.Second + logLevelKey = "activator" + defaultMaxUploadBytes = 32e6 // 32MB - same as app engine + defaultMaxRetries = 60 + defaultRetryInterval = 1 * time.Second ) -type activationHandler struct { - act activator.Activator - logger *zap.SugaredLogger - reporter activator.StatsReporter -} - -// retryRoundTripper retries on 503's for up to 60 seconds. The reason is there is -// a small delay for k8s to include the ready IP in service. -// https://github.com/knative/serving/issues/660#issuecomment-384062553 -type retryRoundTripper struct { - logger *zap.SugaredLogger - reporter activator.StatsReporter - start time.Time -} - -func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { - var err error - var reqBody *bytes.Reader - - transport := http.DefaultTransport - - if r.ProtoMajor == 2 { - transport = h2cutil.NewTransport() - } - - if r.Body != nil { - reqBytes, err := ioutil.ReadAll(r.Body) - - if err != nil { - rrt.logger.Errorf("Error reading request body: %s", err) - return nil, err - } - - reqBody = bytes.NewReader(reqBytes) - r.Body = ioutil.NopCloser(reqBody) - } - - resp, err := transport.RoundTrip(r) - // TODO: Activator should retry with backoff. - // https://github.com/knative/serving/issues/1229 - i := 1 - for ; i < maxRetry; i++ { - if err == nil && resp != nil && resp.StatusCode != 503 { - break - } - - if err != nil { - rrt.logger.Errorf("Error making a request: %s", err) - } - - if resp != nil { - resp.Body.Close() - } - - time.Sleep(retryInterval) - - // The request body cannot be read multiple times for retries. - // The workaround is to clone the request body into a byte reader - // so the body can be read multiple times. - if r.Body != nil { - reqBody.Seek(0, io.SeekStart) - } - - resp, err = transport.RoundTrip(r) - } - - if resp != nil { - rrt.logger.Infof("It took %d tries to get response code %d", i, resp.StatusCode) - namespace := r.Header.Get(reconciler.GetRevisionHeaderNamespace()) - name := r.Header.Get(reconciler.GetRevisionHeaderName()) - config := r.Header.Get(reconciler.GetConfigurationHeader()) - rrt.reporter.ReportResponseCount(namespace, config, name, resp.StatusCode, i, 1.0) - rrt.reporter.ReportResponseTime(namespace, config, name, resp.StatusCode, time.Now().Sub(rrt.start)) - } - return resp, err -} - -func (a *activationHandler) handler(w http.ResponseWriter, r *http.Request) { - namespace := r.Header.Get(reconciler.GetRevisionHeaderNamespace()) - name := r.Header.Get(reconciler.GetRevisionHeaderName()) - config := r.Header.Get(reconciler.GetConfigurationHeader()) - start := time.Now() - - if r.ContentLength > maxUploadBytes { - w.WriteHeader(http.StatusRequestEntityTooLarge) - a.reporter.ReportResponseCount(namespace, config, name, http.StatusRequestEntityTooLarge, 1, 1.0) - a.reporter.ReportResponseTime(namespace, config, name, http.StatusRequestEntityTooLarge, time.Now().Sub(start)) - return - } - - endpoint, status, err := a.act.ActiveEndpoint(namespace, config, name) - if err != nil { - msg := fmt.Sprintf("Error getting active endpoint: %v", err) - http.Error(w, msg, int(status)) - a.logger.Errorf(msg) - a.reporter.ReportResponseCount(namespace, config, name, int(status), 1, 1.0) - a.reporter.ReportResponseTime(namespace, config, name, int(status), time.Now().Sub(start)) - return - } - target := &url.URL{ - Scheme: "http", - Host: fmt.Sprintf("%s:%d", endpoint.FQDN, endpoint.Port), - } - proxy := httputil.NewSingleHostReverseProxy(target) - proxy.Transport = &retryRoundTripper{ - logger: a.logger, - reporter: a.reporter, - start: start, - } - proxy.ServeHTTP(w, r) -} - func main() { flag.Parse() cm, err := configmap.Load("/etc/config-logging") @@ -208,7 +93,17 @@ func main() { a := activator.NewRevisionActivator(kubeClient, servingClient, logger, reporter) a = activator.NewDedupingActivator(a) - ah := &activationHandler{a, logger, reporter} + ah := &activationHandler{a, logger} + + // Retry on 503's for up to 60 seconds. The reason is there is + // a small delay for k8s to include the ready IP in service. + // https://github.com/knative/serving/issues/660#issuecomment-384062553 + rt := baseTransport + rt = newStatusFilterRoundTripper(rt, http.StatusServiceUnavailable) + rt = newRetryRoundTripper(rt, logger, defaultMaxRetries, defaultRetryInterval) + + ah := newActivationHandler(a, rt, logger) + ah = newUploadHandler(ah, defaultMaxUploadBytes) // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() @@ -229,4 +124,6 @@ func main() { mux.HandleFunc("/", ah.handler) mux.Handle("/metrics", promExporter) h2c.ListenAndServe(":8080", mux) + http.Handle("/", ah) + h2c.ListenAndServe(":8080", nil) } diff --git a/cmd/activator/round_trippers.go b/cmd/activator/round_trippers.go new file mode 100644 index 000000000000..9d7175d0ae74 --- /dev/null +++ b/cmd/activator/round_trippers.go @@ -0,0 +1,103 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "net/http" + "time" + + h2cutil "github.com/knative/serving/pkg/h2c" + "go.uber.org/zap" +) + +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (rt roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return rt(r) +} + +// baseTransport will the appropriate transport for the request's http protocol version +var baseTransport http.RoundTripper = roundTripperFunc(func(r *http.Request) (*http.Response, error) { + var transport http.RoundTripper = http.DefaultTransport + if r.ProtoMajor == 2 { + transport = h2cutil.NewTransport() + } + + return transport.RoundTrip(r) +}) + +// statusFilterRoundTripper returns an error if the response contains one of the filtered statuses. +type statusFilterRoundTripper struct { + transport http.RoundTripper + statuses []int +} + +func newStatusFilterRoundTripper(rt http.RoundTripper, statuses ...int) http.RoundTripper { + return statusFilterRoundTripper{rt, statuses} +} + +func (rt statusFilterRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + resp, err := rt.transport.RoundTrip(r) + if err != nil { + return nil, err + } + + for _, status := range rt.statuses { + if resp.StatusCode == status { + resp.Body.Close() + + return nil, fmt.Errorf("Filtering %d", status) + } + } + + return resp, nil +} + +// retryRoundTripper retries a request on error up to `maxRetries` times, +// waiting `interval` milliseconds between retries +type retryRoundTripper struct { + logger *zap.SugaredLogger + maxRetries int + interval time.Duration + transport http.RoundTripper +} + +func newRetryRoundTripper(rt http.RoundTripper, l *zap.SugaredLogger, mr int, i time.Duration) http.RoundTripper { + return retryRoundTripper{logger: l, maxRetries: mr, interval: i, transport: rt} +} + +func (rrt retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + resp, err := rrt.transport.RoundTrip(r) + // TODO: Activator should retry with backoff. + // https://github.com/knative/serving/issues/1229 + i := 1 + for ; i < rrt.maxRetries; i++ { + if err == nil { + break + } + + rrt.logger.Errorf("Error making a request: %s", err) + + time.Sleep(rrt.interval) + + resp, err = rrt.transport.RoundTrip(r) + } + + // TODO: add metrics for number of tries and the response code. + if resp != nil { + rrt.logger.Infof("It took %d tries to get response code %d", i, resp.StatusCode) + } + return resp, err +} diff --git a/cmd/activator/round_trippers_test.go b/cmd/activator/round_trippers_test.go new file mode 100644 index 000000000000..ca01ae73f56a --- /dev/null +++ b/cmd/activator/round_trippers_test.go @@ -0,0 +1,152 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package main + +import ( + "errors" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + "time" + + "go.uber.org/zap" +) + +func TestRetryRoundTripper(t *testing.T) { + wantBody := "all good!" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(wantBody)) + })) + l := zap.NewExample().Sugar() + maxRetries := 3 + interval := 10 * time.Millisecond + + examples := []struct { + label string + retries int + wantErr bool + }{ + {"success", maxRetries, false}, + {"failure", maxRetries + 1, true}, + } + + for _, e := range examples { + t.Run(e.label, func(t *testing.T) { + var last time.Time + + gotRetries := 0 + rt := roundTripperFunc(func(r *http.Request) (*http.Response, error) { + gotRetries += 1 + + now := time.Now() + duration := now.Sub(last) + if duration < interval { + t.Errorf("Unexpected retry interval. Want %v, got %v", interval, duration) + } + last = now + + if gotRetries < e.retries { + + if r.Body != nil { + ioutil.ReadAll(r.Body) + r.Body.Close() + } + + return nil, errors.New("some error!") + } + + return http.DefaultTransport.RoundTrip(r) + }) + + rrt := newRetryRoundTripper(rt, l, maxRetries, interval) + req := httptest.NewRequest("", ts.URL, nil) + + resp, err := rrt.RoundTrip(req) + + wantRetries := maxRetries + if e.retries < wantRetries { + wantRetries = e.retries + } + + if gotRetries != wantRetries { + t.Errorf("Unexpected number of retries. Want %d, got %d", wantRetries, gotRetries) + } + + if e.wantErr { + if err == nil { + t.Errorf("Expected error") + } + } else { + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + gotBody, _ := ioutil.ReadAll(resp.Body) + if string(gotBody) != wantBody { + t.Errorf("Unexpected response. Want %q, got %q", wantBody, gotBody) + } + } + }) + } +} + +func TestStatusFilterRoundTripper(t *testing.T) { + testServer := func(status int) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(status) + })) + } + + goodRT := http.DefaultTransport + errorRT := roundTripperFunc(func(r *http.Request) (*http.Response, error) { + return nil, errors.New("some error") + }) + + filtered := []int{501, 502} + + examples := []struct { + label string + transport http.RoundTripper + status int + err error + }{ + {"filtered status", goodRT, 502, errors.New("Filtering 502")}, + {"unfiltered status", goodRT, 503, nil}, + {"transport error", errorRT, 200, errors.New("some error")}, + } + + for _, e := range examples { + t.Run(e.label, func(t *testing.T) { + ts := testServer(e.status) + defer ts.Close() + + rt := newStatusFilterRoundTripper(e.transport, filtered...) + + req := httptest.NewRequest("", ts.URL, nil) + resp, err := rt.RoundTrip(req) + + if e.err != nil { + if err.Error() != e.err.Error() { + t.Errorf("Unexpected error. Want %v, got %v", e.err, err) + } + } else { + if err != nil { + t.Errorf("Unexpected error %v", err) + } + if resp.StatusCode != e.status { + t.Errorf("Unexpected response status. Want %d, got %d", e.status, resp.StatusCode) + } + } + }) + } +} From 095944ce402617c8b6249be4cfd68dedc5c8b7ce Mon Sep 17 00:00:00 2001 From: Tanzeeb Khalili Date: Wed, 25 Jul 2018 12:32:06 -0400 Subject: [PATCH 02/13] Use pointers in activator round trippers to reduce copying overhead --- cmd/activator/round_trippers.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/activator/round_trippers.go b/cmd/activator/round_trippers.go index 9d7175d0ae74..12b8a86568dd 100644 --- a/cmd/activator/round_trippers.go +++ b/cmd/activator/round_trippers.go @@ -45,10 +45,10 @@ type statusFilterRoundTripper struct { } func newStatusFilterRoundTripper(rt http.RoundTripper, statuses ...int) http.RoundTripper { - return statusFilterRoundTripper{rt, statuses} + return &statusFilterRoundTripper{rt, statuses} } -func (rt statusFilterRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { +func (rt *statusFilterRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { resp, err := rt.transport.RoundTrip(r) if err != nil { return nil, err @@ -75,10 +75,10 @@ type retryRoundTripper struct { } func newRetryRoundTripper(rt http.RoundTripper, l *zap.SugaredLogger, mr int, i time.Duration) http.RoundTripper { - return retryRoundTripper{logger: l, maxRetries: mr, interval: i, transport: rt} + return &retryRoundTripper{logger: l, maxRetries: mr, interval: i, transport: rt} } -func (rrt retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { +func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { resp, err := rrt.transport.RoundTrip(r) // TODO: Activator should retry with backoff. // https://github.com/knative/serving/issues/1229 From 7e434e9d9cc901243a2b7fe1c91b6a2ad6b51feb Mon Sep 17 00:00:00 2001 From: Tanzeeb Khalili Date: Wed, 25 Jul 2018 13:38:19 -0400 Subject: [PATCH 03/13] Add h2c.DefaultTransport, similar to http.DefaultTransport --- cmd/activator/round_trippers.go | 4 ++-- cmd/queue/main.go | 2 +- pkg/h2c/transport.go | 4 +++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/activator/round_trippers.go b/cmd/activator/round_trippers.go index 12b8a86568dd..c6d857dd5fd8 100644 --- a/cmd/activator/round_trippers.go +++ b/cmd/activator/round_trippers.go @@ -28,11 +28,11 @@ func (rt roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { return rt(r) } -// baseTransport will the appropriate transport for the request's http protocol version +// baseTransport will use the appropriate transport for the request's http protocol version var baseTransport http.RoundTripper = roundTripperFunc(func(r *http.Request) (*http.Response, error) { var transport http.RoundTripper = http.DefaultTransport if r.ProtoMajor == 2 { - transport = h2cutil.NewTransport() + transport = h2cutil.DefaultTransport } return transport.RoundTrip(r) diff --git a/cmd/queue/main.go b/cmd/queue/main.go index b9facf56b216..e5bf49b7f6cb 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -284,7 +284,7 @@ func main() { httpProxy = httputil.NewSingleHostReverseProxy(target) h2cProxy = httputil.NewSingleHostReverseProxy(target) - h2cProxy.Transport = h2cutil.NewTransport() + h2cProxy.Transport = h2cutil.DefaultTransport logger.Infof("Queue container is starting, concurrencyModel: %s", *concurrencyModel) config, err := rest.InClusterConfig() diff --git a/pkg/h2c/transport.go b/pkg/h2c/transport.go index 60b544cafc96..0b68f60e696c 100644 --- a/pkg/h2c/transport.go +++ b/pkg/h2c/transport.go @@ -18,4 +18,6 @@ func NewTransport() http.RoundTripper { return net.Dial(netw, addr) }, } -} \ No newline at end of file +} + +var DefaultTransport http.RoundTripper = NewTransport() From c42351530aaa1edf5926a62c44d119a107414684 Mon Sep 17 00:00:00 2001 From: Tanzeeb Khalili Date: Wed, 25 Jul 2018 18:33:06 -0400 Subject: [PATCH 04/13] Use pointer receivers for all handlers in cmd/activator --- cmd/activator/handlers.go | 4 ++-- cmd/activator/handlers_test.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/activator/handlers.go b/cmd/activator/handlers.go index 2d5dba94604f..1df809ea4461 100644 --- a/cmd/activator/handlers.go +++ b/cmd/activator/handlers.go @@ -76,13 +76,13 @@ type uploadHandler struct { } func newUploadHandler(h http.Handler, maxUploadBytes int64) http.Handler { - return uploadHandler{ + return &uploadHandler{ Handler: h, MaxUploadBytes: maxUploadBytes, } } -func (h uploadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h *uploadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.ContentLength > h.MaxUploadBytes { w.WriteHeader(http.StatusRequestEntityTooLarge) return diff --git a/cmd/activator/handlers_test.go b/cmd/activator/handlers_test.go index 2f59925d1395..d01bdb236649 100644 --- a/cmd/activator/handlers_test.go +++ b/cmd/activator/handlers_test.go @@ -35,19 +35,19 @@ type fakeActivator struct { name string } -func newFakeActivator(namespace string, name string, server *httptest.Server) fakeActivator { +func newFakeActivator(namespace string, name string, server *httptest.Server) activator.Activator { url, _ := url.Parse(server.URL) host := url.Hostname() port, _ := strconv.Atoi(url.Port()) - return fakeActivator{ + return &fakeActivator{ endpoint: activator.Endpoint{FQDN: host, Port: int32(port)}, namespace: namespace, name: name, } } -func (fa fakeActivator) ActiveEndpoint(namespace, name string) (activator.Endpoint, activator.Status, error) { +func (fa *fakeActivator) ActiveEndpoint(namespace, name string) (activator.Endpoint, activator.Status, error) { if namespace == fa.namespace && name == fa.name { return fa.endpoint, http.StatusOK, nil } @@ -55,7 +55,7 @@ func (fa fakeActivator) ActiveEndpoint(namespace, name string) (activator.Endpoi return activator.Endpoint{}, http.StatusNotFound, errors.New("not found!") } -func (fa fakeActivator) Shutdown() { +func (fa *fakeActivator) Shutdown() { } func TestActivationHandler(t *testing.T) { From 7b8f356bc473c5cbed90f3631616a853c349407d Mon Sep 17 00:00:00 2001 From: Tanzeeb Khalili Date: Thu, 26 Jul 2018 10:41:30 -0400 Subject: [PATCH 05/13] Log both happy and sad paths in the activator --- cmd/activator/round_trippers.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cmd/activator/round_trippers.go b/cmd/activator/round_trippers.go index c6d857dd5fd8..5a49727da4ec 100644 --- a/cmd/activator/round_trippers.go +++ b/cmd/activator/round_trippers.go @@ -96,8 +96,11 @@ func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) } // TODO: add metrics for number of tries and the response code. - if resp != nil { - rrt.logger.Infof("It took %d tries to get response code %d", i, resp.StatusCode) + if err == nil { + rrt.logger.Infof("Activation finished after %d attempt(s). Response code: %d", i, resp.StatusCode) + } else { + rrt.logger.Errorf("Activation failed after %d attempts. Last error: %v", i, err) } + return resp, err } From 396d4179b66b812746790c6be166ae8b813bf8bb Mon Sep 17 00:00:00 2001 From: Tanzeeb Khalili Date: Thu, 26 Jul 2018 18:02:13 -0400 Subject: [PATCH 06/13] Add explicit labels for table tests in activator --- cmd/activator/handlers_test.go | 45 ++++++++++++++++++++++++---- cmd/activator/round_trippers_test.go | 33 ++++++++++++++++---- 2 files changed, 67 insertions(+), 11 deletions(-) diff --git a/cmd/activator/handlers_test.go b/cmd/activator/handlers_test.go index d01bdb236649..ef44d819cc60 100644 --- a/cmd/activator/handlers_test.go +++ b/cmd/activator/handlers_test.go @@ -82,9 +82,30 @@ func TestActivationHandler(t *testing.T) { wantCode int wantErr error }{ - {"active endpoint", "real-namespace", "real-name", "everything good!", http.StatusOK, nil}, - {"no active endpoint", "fake-namespace", "fake-name", errMsg("not found!"), http.StatusNotFound, nil}, - {"request error", "real-namespace", "real-name", "", http.StatusBadGateway, errors.New("request error!")}, + { + label: "active endpoint", + namespace: "real-namespace", + name: "real-name", + wantBody: "everything good!", + wantCode: http.StatusOK, + wantErr: nil, + }, + { + label: "no active endpoint", + namespace: "fake-namespace", + name: "fake-name", + wantBody: errMsg("not found!"), + wantCode: http.StatusNotFound, + wantErr: nil, + }, + { + label: "request error", + namespace: "real-namespace", + name: "real-name", + wantBody: "", + wantCode: http.StatusBadGateway, + wantErr: errors.New("request error!"), + }, } for _, e := range examples { @@ -131,9 +152,21 @@ func TestUploadHandler(t *testing.T) { maxUpload int status int }{ - {"under", len(payload) + 1, http.StatusOK}, - {"equal", len(payload), http.StatusOK}, - {"over", len(payload) - 1, http.StatusRequestEntityTooLarge}, + { + label: "under", + maxUpload: len(payload) + 1, + status: http.StatusOK, + }, + { + label: "equal", + maxUpload: len(payload), + status: http.StatusOK, + }, + { + label: "over", + maxUpload: len(payload) - 1, + status: http.StatusRequestEntityTooLarge, + }, } for _, e := range examples { diff --git a/cmd/activator/round_trippers_test.go b/cmd/activator/round_trippers_test.go index ca01ae73f56a..00ca15bd5c02 100644 --- a/cmd/activator/round_trippers_test.go +++ b/cmd/activator/round_trippers_test.go @@ -37,8 +37,16 @@ func TestRetryRoundTripper(t *testing.T) { retries int wantErr bool }{ - {"success", maxRetries, false}, - {"failure", maxRetries + 1, true}, + { + label: "success", + retries: maxRetries, + wantErr: false, + }, + { + label: "failure", + retries: maxRetries + 1, + wantErr: true, + }, } for _, e := range examples { @@ -120,9 +128,24 @@ func TestStatusFilterRoundTripper(t *testing.T) { status int err error }{ - {"filtered status", goodRT, 502, errors.New("Filtering 502")}, - {"unfiltered status", goodRT, 503, nil}, - {"transport error", errorRT, 200, errors.New("some error")}, + { + label: "filtered status", + transport: goodRT, + status: 502, + err: errors.New("Filtering 502"), + }, + { + label: "unfiltered status", + transport: goodRT, + status: 503, + err: nil, + }, + { + label: "transport error", + transport: errorRT, + status: 200, + err: errors.New("some error"), + }, } for _, e := range examples { From 9d6568777bf5880c88f6a5d7e2f4a59d6edee2eb Mon Sep 17 00:00:00 2001 From: Tanzeeb Khalili Date: Fri, 27 Jul 2018 18:23:26 -0400 Subject: [PATCH 07/13] Add tests for http protocol selection in activator --- cmd/activator/main.go | 3 +- cmd/activator/round_trippers.go | 22 +++++----- cmd/activator/round_trippers_test.go | 61 ++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 11 deletions(-) diff --git a/cmd/activator/main.go b/cmd/activator/main.go index bab04af4ae8a..b631623eb6e6 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -30,6 +30,7 @@ import ( clientset "github.com/knative/serving/pkg/client/clientset/versioned" "github.com/knative/serving/pkg/configmap" + h2cutil "github.com/knative/serving/pkg/h2c" "github.com/knative/serving/pkg/logging" "github.com/knative/serving/pkg/system" "github.com/knative/serving/third_party/h2c" @@ -98,7 +99,7 @@ func main() { // Retry on 503's for up to 60 seconds. The reason is there is // a small delay for k8s to include the ready IP in service. // https://github.com/knative/serving/issues/660#issuecomment-384062553 - rt := baseTransport + rt := newHttpRoundTripper(http.DefaultTransport, h2cutil.DefaultTransport) rt = newStatusFilterRoundTripper(rt, http.StatusServiceUnavailable) rt = newRetryRoundTripper(rt, logger, defaultMaxRetries, defaultRetryInterval) diff --git a/cmd/activator/round_trippers.go b/cmd/activator/round_trippers.go index 5a49727da4ec..fa123fbe1b7e 100644 --- a/cmd/activator/round_trippers.go +++ b/cmd/activator/round_trippers.go @@ -18,25 +18,27 @@ import ( "net/http" "time" - h2cutil "github.com/knative/serving/pkg/h2c" "go.uber.org/zap" ) -type roundTripperFunc func(*http.Request) (*http.Response, error) +// httpRoundTripper will use the appropriate transport for the request's http protocol version +type httpRoundTripper struct { + v1 http.RoundTripper + v2 http.RoundTripper +} -func (rt roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { - return rt(r) +func newHttpRoundTripper(v1 http.RoundTripper, v2 http.RoundTripper) http.RoundTripper { + return &httpRoundTripper{v1: v1, v2: v2} } -// baseTransport will use the appropriate transport for the request's http protocol version -var baseTransport http.RoundTripper = roundTripperFunc(func(r *http.Request) (*http.Response, error) { - var transport http.RoundTripper = http.DefaultTransport +func (rt *httpRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + var t http.RoundTripper = rt.v1 if r.ProtoMajor == 2 { - transport = h2cutil.DefaultTransport + t = rt.v2 } - return transport.RoundTrip(r) -}) + return t.RoundTrip(r) +} // statusFilterRoundTripper returns an error if the response contains one of the filtered statuses. type statusFilterRoundTripper struct { diff --git a/cmd/activator/round_trippers_test.go b/cmd/activator/round_trippers_test.go index 00ca15bd5c02..98a12a50693f 100644 --- a/cmd/activator/round_trippers_test.go +++ b/cmd/activator/round_trippers_test.go @@ -23,6 +23,67 @@ import ( "go.uber.org/zap" ) +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (rt roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return rt(r) +} + +func TestHttpRoundTripper(t *testing.T) { + v1Flag := false + v1RT := roundTripperFunc(func(r *http.Request) (*http.Response, error) { + v1Flag = true + + return nil, nil + }) + + v2Flag := false + v2RT := roundTripperFunc(func(r *http.Request) (*http.Response, error) { + v2Flag = true + + return nil, nil + }) + + rt := newHttpRoundTripper(v1RT, v2RT) + + examples := []struct { + label string + protoMajor int + wantFlag *bool + }{ + { + label: "use default transport for http1", + protoMajor: 1, + wantFlag: &v1Flag, + }, + { + label: "use h2c transport for http2", + protoMajor: 2, + wantFlag: &v2Flag, + }, + { + label: "use default transport for all others", + protoMajor: 99, + wantFlag: &v1Flag, + }, + } + + for _, e := range examples { + t.Run(e.label, func(t *testing.T) { + v1Flag = false + v2Flag = false + + r := &http.Request{ProtoMajor: e.protoMajor} + + rt.RoundTrip(r) + + if *e.wantFlag != true { + t.Error("Wrong transport selected for request.") + } + }) + } +} + func TestRetryRoundTripper(t *testing.T) { wantBody := "all good!" ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { From f41e5d4d4e8b3cedcc60390b2d2a6002aa6236dc Mon Sep 17 00:00:00 2001 From: Tanzeeb Khalili Date: Mon, 30 Jul 2018 16:42:06 -0400 Subject: [PATCH 08/13] Separate retry logic from request logic in cmd/activator/retryRoundTripper --- cmd/activator/round_trippers.go | 39 +++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/cmd/activator/round_trippers.go b/cmd/activator/round_trippers.go index fa123fbe1b7e..dcd19f9b5f3d 100644 --- a/cmd/activator/round_trippers.go +++ b/cmd/activator/round_trippers.go @@ -80,29 +80,34 @@ func newRetryRoundTripper(rt http.RoundTripper, l *zap.SugaredLogger, mr int, i return &retryRoundTripper{logger: l, maxRetries: mr, interval: i, transport: rt} } -func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { - resp, err := rrt.transport.RoundTrip(r) - // TODO: Activator should retry with backoff. - // https://github.com/knative/serving/issues/1229 - i := 1 - for ; i < rrt.maxRetries; i++ { - if err == nil { - break - } - - rrt.logger.Errorf("Error making a request: %s", err) +func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (resp *http.Response, err error) { + attempts := rrt.retry(func() bool { + resp, err = rrt.transport.RoundTrip(r) + if err != nil { + rrt.logger.Errorf("Error making a request: %s", err) - time.Sleep(rrt.interval) + return true + } - resp, err = rrt.transport.RoundTrip(r) - } + return false + }) // TODO: add metrics for number of tries and the response code. if err == nil { - rrt.logger.Infof("Activation finished after %d attempt(s). Response code: %d", i, resp.StatusCode) + rrt.logger.Infof("Activation finished after %d attempt(s). Response code: %d", attempts, resp.StatusCode) } else { - rrt.logger.Errorf("Activation failed after %d attempts. Last error: %v", i, err) + rrt.logger.Errorf("Activation failed after %d attempts. Last error: %v", attempts, err) + } + + return +} + +func (rrt *retryRoundTripper) retry(action func() bool) (attempts int) { + // TODO: Activator should retry with backoff. + // https://github.com/knative/serving/issues/1229 + for attempts = 1; attempts <= rrt.maxRetries && action(); attempts++ { + time.Sleep(rrt.interval) } - return resp, err + return } From dec60e7e7037c0fb5aad4fde438c3aa0c11d5a49 Mon Sep 17 00:00:00 2001 From: Tanzeeb Khalili Date: Tue, 31 Jul 2018 20:40:09 -0400 Subject: [PATCH 09/13] Refactor retryRoundTripper to take retry strategy and shouldRetry condition as parameters --- cmd/activator/main.go | 3 +- cmd/activator/round_trippers.go | 72 ++++---- cmd/activator/round_trippers_test.go | 245 ++++++++++++++------------- 3 files changed, 154 insertions(+), 166 deletions(-) diff --git a/cmd/activator/main.go b/cmd/activator/main.go index b631623eb6e6..0262e3d36ec7 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -100,8 +100,7 @@ func main() { // a small delay for k8s to include the ready IP in service. // https://github.com/knative/serving/issues/660#issuecomment-384062553 rt := newHttpRoundTripper(http.DefaultTransport, h2cutil.DefaultTransport) - rt = newStatusFilterRoundTripper(rt, http.StatusServiceUnavailable) - rt = newRetryRoundTripper(rt, logger, defaultMaxRetries, defaultRetryInterval) + rt = newRetryRoundTripper(rt, logger, linearRetryer(retryInterval, maxRetries), retry503) ah := newActivationHandler(a, rt, logger) ah = newUploadHandler(ah, defaultMaxUploadBytes) diff --git a/cmd/activator/round_trippers.go b/cmd/activator/round_trippers.go index dcd19f9b5f3d..e5f5d2a04d41 100644 --- a/cmd/activator/round_trippers.go +++ b/cmd/activator/round_trippers.go @@ -14,7 +14,6 @@ limitations under the License. package main import ( - "fmt" "net/http" "time" @@ -40,52 +39,51 @@ func (rt *httpRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { return t.RoundTrip(r) } -// statusFilterRoundTripper returns an error if the response contains one of the filtered statuses. -type statusFilterRoundTripper struct { - transport http.RoundTripper - statuses []int -} - -func newStatusFilterRoundTripper(rt http.RoundTripper, statuses ...int) http.RoundTripper { - return &statusFilterRoundTripper{rt, statuses} -} - -func (rt *statusFilterRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { - resp, err := rt.transport.RoundTrip(r) - if err != nil { - return nil, err - } +type retryer func(func() bool) int - for _, status := range rt.statuses { - if resp.StatusCode == status { - resp.Body.Close() - - return nil, fmt.Errorf("Filtering %d", status) +func linearRetryer(interval time.Duration, maxRetries int) retryer { + return func(action func() bool) (retries int) { + for retries = 1; !action() && retries < maxRetries; retries++ { + time.Sleep(interval) } + return } +} - return resp, nil +type shouldRetryFunc func(*http.Response) bool + +func retry503(resp *http.Response) bool { + return resp.StatusCode == http.StatusServiceUnavailable } -// retryRoundTripper retries a request on error up to `maxRetries` times, -// waiting `interval` milliseconds between retries type retryRoundTripper struct { - logger *zap.SugaredLogger - maxRetries int - interval time.Duration - transport http.RoundTripper + logger *zap.SugaredLogger + transport http.RoundTripper + retry retryer + shouldRetry shouldRetryFunc } -func newRetryRoundTripper(rt http.RoundTripper, l *zap.SugaredLogger, mr int, i time.Duration) http.RoundTripper { - return &retryRoundTripper{logger: l, maxRetries: mr, interval: i, transport: rt} +// retryRoundTripper retries a request on error or `shouldRetry` condition, using the given `retry` strategy +func newRetryRoundTripper(rt http.RoundTripper, l *zap.SugaredLogger, r retryer, sr shouldRetryFunc) http.RoundTripper { + return &retryRoundTripper{ + logger: l, + transport: rt, + retry: r, + shouldRetry: sr, + } } func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (resp *http.Response, err error) { attempts := rrt.retry(func() bool { resp, err = rrt.transport.RoundTrip(r) + if err != nil { rrt.logger.Errorf("Error making a request: %s", err) + return true + } + if rrt.shouldRetry(resp) { + resp.Body.Close() return true } @@ -94,19 +92,9 @@ func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (resp *http.Response, e // TODO: add metrics for number of tries and the response code. if err == nil { - rrt.logger.Infof("Activation finished after %d attempt(s). Response code: %d", attempts, resp.StatusCode) + rrt.logger.Infof("Finished after %d attempt(s). Response code: %d", attempts, resp.StatusCode) } else { - rrt.logger.Errorf("Activation failed after %d attempts. Last error: %v", attempts, err) - } - - return -} - -func (rrt *retryRoundTripper) retry(action func() bool) (attempts int) { - // TODO: Activator should retry with backoff. - // https://github.com/knative/serving/issues/1229 - for attempts = 1; attempts <= rrt.maxRetries && action(); attempts++ { - time.Sleep(rrt.interval) + rrt.logger.Errorf("Failed after %d attempts. Last error: %v", attempts, err) } return diff --git a/cmd/activator/round_trippers_test.go b/cmd/activator/round_trippers_test.go index 98a12a50693f..a05301d1c5c0 100644 --- a/cmd/activator/round_trippers_test.go +++ b/cmd/activator/round_trippers_test.go @@ -14,9 +14,7 @@ package main import ( "errors" - "io/ioutil" "net/http" - "net/http/httptest" "testing" "time" @@ -30,54 +28,48 @@ func (rt roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { } func TestHttpRoundTripper(t *testing.T) { - v1Flag := false - v1RT := roundTripperFunc(func(r *http.Request) (*http.Response, error) { - v1Flag = true + wants := map[string]bool{} + frt := func(key string) http.RoundTripper { + return roundTripperFunc(func(r *http.Request) (*http.Response, error) { + wants[key] = true - return nil, nil - }) - - v2Flag := false - v2RT := roundTripperFunc(func(r *http.Request) (*http.Response, error) { - v2Flag = true - - return nil, nil - }) + return nil, nil + }) + } - rt := newHttpRoundTripper(v1RT, v2RT) + rt := newHttpRoundTripper(frt("v1"), frt("v2")) examples := []struct { label string protoMajor int - wantFlag *bool + want string }{ { label: "use default transport for http1", protoMajor: 1, - wantFlag: &v1Flag, + want: "v1", }, { label: "use h2c transport for http2", protoMajor: 2, - wantFlag: &v2Flag, + want: "v2", }, { label: "use default transport for all others", protoMajor: 99, - wantFlag: &v1Flag, + want: "v1", }, } for _, e := range examples { t.Run(e.label, func(t *testing.T) { - v1Flag = false - v2Flag = false + wants[e.want] = false r := &http.Request{ProtoMajor: e.protoMajor} rt.RoundTrip(r) - if *e.wantFlag != true { + if wants[e.want] != true { t.Error("Wrong transport selected for request.") } }) @@ -85,151 +77,160 @@ func TestHttpRoundTripper(t *testing.T) { } func TestRetryRoundTripper(t *testing.T) { - wantBody := "all good!" - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(wantBody)) - })) - l := zap.NewExample().Sugar() - maxRetries := 3 - interval := 10 * time.Millisecond + req := &http.Request{} + + goodStatus := 200 + badStatus := 500 + + resp := func(status int) *http.Response { + return &http.Response{StatusCode: status, Body: &readCloser{}} + } + + someErr := errors.New("some error") + + logger := zap.NewExample().Sugar() + shouldRetry := func(resp *http.Response) bool { + return resp.StatusCode == badStatus + } examples := []struct { - label string - retries int - wantErr bool + label string + wantResp *http.Response + wantErr error + wantRetry bool + wantBodyClosed bool }{ { - label: "success", - retries: maxRetries, - wantErr: false, + label: "no retry", + wantResp: resp(goodStatus), + wantErr: nil, + wantRetry: false, + wantBodyClosed: false, + }, + { + label: "retry on error", + wantResp: nil, + wantErr: someErr, + wantRetry: true, + wantBodyClosed: false, }, { - label: "failure", - retries: maxRetries + 1, - wantErr: true, + label: "retry on condition", + wantResp: resp(badStatus), + wantErr: nil, + wantRetry: true, + wantBodyClosed: true, }, } for _, e := range examples { t.Run(e.label, func(t *testing.T) { - var last time.Time - - gotRetries := 0 - rt := roundTripperFunc(func(r *http.Request) (*http.Response, error) { - gotRetries += 1 - - now := time.Now() - duration := now.Sub(last) - if duration < interval { - t.Errorf("Unexpected retry interval. Want %v, got %v", interval, duration) - } - last = now - - if gotRetries < e.retries { + transport := roundTripperFunc(func(r *http.Request) (*http.Response, error) { + return e.wantResp, e.wantErr + }) - if r.Body != nil { - ioutil.ReadAll(r.Body) - r.Body.Close() + retry := func(a func() bool) int { + if a() { + if !e.wantRetry { + t.Errorf("Unexpected retry.") } - return nil, errors.New("some error!") } - return http.DefaultTransport.RoundTrip(r) - }) + return 1 + } - rrt := newRetryRoundTripper(rt, l, maxRetries, interval) - req := httptest.NewRequest("", ts.URL, nil) + rt := newRetryRoundTripper(transport, logger, retry, shouldRetry) - resp, err := rrt.RoundTrip(req) + gotResp, gotErr := rt.RoundTrip(req) - wantRetries := maxRetries - if e.retries < wantRetries { - wantRetries = e.retries + if gotResp != e.wantResp { + t.Errorf("Unexpected response. Want %v, got %v", e.wantResp, gotResp) } - if gotRetries != wantRetries { - t.Errorf("Unexpected number of retries. Want %d, got %d", wantRetries, gotRetries) + if gotErr != e.wantErr { + t.Errorf("Unexpected error. Want %v, got %v", e.wantErr, gotErr) } - if e.wantErr { - if err == nil { - t.Errorf("Expected error") - } - } else { - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - gotBody, _ := ioutil.ReadAll(resp.Body) - if string(gotBody) != wantBody { - t.Errorf("Unexpected response. Want %q, got %q", wantBody, gotBody) - } + if e.wantBodyClosed && !e.wantResp.Body.(*readCloser).closed { + t.Errorf("Expected response body to be closed.") } }) } } -func TestStatusFilterRoundTripper(t *testing.T) { - testServer := func(status int) *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(status) - })) - } +func TestLinearRetry(t *testing.T) { + checkInterval := func(last *time.Time, want time.Duration) { + now := time.Now() + got := now.Sub(*last) + *last = now - goodRT := http.DefaultTransport - errorRT := roundTripperFunc(func(r *http.Request) (*http.Response, error) { - return nil, errors.New("some error") - }) - - filtered := []int{501, 502} + if got < want { + t.Errorf("Unexpected retry interval. Want %v, got %v", want, got) + } + } examples := []struct { - label string - transport http.RoundTripper - status int - err error + label string + interval time.Duration + maxRetries int + responses []bool + wantRetries int }{ { - label: "filtered status", - transport: goodRT, - status: 502, - err: errors.New("Filtering 502"), + label: "atleast once", + interval: 5 * time.Millisecond, + maxRetries: 0, + responses: []bool{true}, + wantRetries: 1, + }, + { + label: "< maxRetries", + interval: 5 * time.Millisecond, + maxRetries: 3, + responses: []bool{false, true}, + wantRetries: 2, }, { - label: "unfiltered status", - transport: goodRT, - status: 503, - err: nil, + label: "= maxRetries", + interval: 10 * time.Millisecond, + maxRetries: 3, + responses: []bool{false, false, true}, + wantRetries: 3, }, { - label: "transport error", - transport: errorRT, - status: 200, - err: errors.New("some error"), + label: "> maxRetries", + interval: 5 * time.Millisecond, + maxRetries: 3, + responses: []bool{false, false, false, true}, + wantRetries: 3, }, } for _, e := range examples { t.Run(e.label, func(t *testing.T) { - ts := testServer(e.status) - defer ts.Close() + var lastRetry time.Time + var got int - rt := newStatusFilterRoundTripper(e.transport, filtered...) + a := func() bool { + checkInterval(&lastRetry, e.interval) - req := httptest.NewRequest("", ts.URL, nil) - resp, err := rt.RoundTrip(req) + ok := e.responses[got] + got++ - if e.err != nil { - if err.Error() != e.err.Error() { - t.Errorf("Unexpected error. Want %v, got %v", e.err, err) - } - } else { - if err != nil { - t.Errorf("Unexpected error %v", err) - } - if resp.StatusCode != e.status { - t.Errorf("Unexpected response status. Want %d, got %d", e.status, resp.StatusCode) - } + return ok + } + + lr := linearRetryer(e.interval, e.maxRetries) + + reported := lr(a) + + if got != e.wantRetries { + t.Errorf("Unexpected retries. Want %d, got %d", e.wantRetries, got) + } + + if reported != e.wantRetries { + t.Errorf("Unexpected retries reported. Want %d, got %d", e.wantRetries, reported) } }) } From 26dc0f86577e7e25cf2b139f8dfba2351a7e7772 Mon Sep 17 00:00:00 2001 From: Tanzeeb Khalili Date: Wed, 1 Aug 2018 14:50:29 -0400 Subject: [PATCH 10/13] Move non-activator specific components from cmd/activator to cmd/util --- cmd/activator/handlers.go | 70 +----------- cmd/activator/handlers_test.go | 103 ++---------------- cmd/activator/main.go | 7 +- cmd/util/handlers.go | 46 ++++++++ cmd/util/handlers_test.go | 73 +++++++++++++ cmd/util/io.go | 64 +++++++++++ cmd/util/io_test.go | 49 +++++++++ .../round_trippers.go => util/transports.go} | 30 +++-- .../transports_test.go} | 20 ++-- 9 files changed, 275 insertions(+), 187 deletions(-) create mode 100644 cmd/util/handlers.go create mode 100644 cmd/util/handlers_test.go create mode 100644 cmd/util/io.go create mode 100644 cmd/util/io_test.go rename cmd/{activator/round_trippers.go => util/transports.go} (71%) rename cmd/{activator/round_trippers_test.go => util/transports_test.go} (89%) diff --git a/cmd/activator/handlers.go b/cmd/activator/handlers.go index 1df809ea4461..9c3ef73d7086 100644 --- a/cmd/activator/handlers.go +++ b/cmd/activator/handlers.go @@ -14,10 +14,7 @@ limitations under the License. package main import ( - "bytes" "fmt" - "io" - "io/ioutil" "net/http" "net/http/httputil" "net/url" @@ -27,15 +24,15 @@ import ( "go.uber.org/zap" ) -// activationHandler will proxy a request to the active endpoint for the specified revision, -// using the provided transport type activationHandler struct { activator activator.Activator logger *zap.SugaredLogger transport http.RoundTripper } -func newActivationHandler(a activator.Activator, rt http.RoundTripper, l *zap.SugaredLogger) http.Handler { +// activationHandler will proxy a request to the active endpoint for the specified revision, +// using the provided transport +func NewActivationHandler(a activator.Activator, rt http.RoundTripper, l *zap.SugaredLogger) http.Handler { return &activationHandler{activator: a, transport: rt, logger: l} } @@ -67,64 +64,3 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { proxy.ServeHTTP(w, r) } - -// uploadHandler wraps the provided handler with a request body that supports -// re-reading and prevents uploads larger than `maxUploadBytes` -type uploadHandler struct { - http.Handler - MaxUploadBytes int64 -} - -func newUploadHandler(h http.Handler, maxUploadBytes int64) http.Handler { - return &uploadHandler{ - Handler: h, - MaxUploadBytes: maxUploadBytes, - } -} - -func (h *uploadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if r.ContentLength > h.MaxUploadBytes { - w.WriteHeader(http.StatusRequestEntityTooLarge) - return - } - - // The request body cannot be read multiple times for retries. - // The workaround is to clone the request body into a byte reader - // so the body can be read multiple times. - r.Body = newRewinder(r.Body) - - h.Handler.ServeHTTP(w, r) -} - -// rewinder wraps a single-use `ReadCloser` into a `ReadCloser` that can be read multiple times -type rewinder struct { - rc io.ReadCloser - rs io.ReadSeeker -} - -func newRewinder(rc io.ReadCloser) io.ReadCloser { - return &rewinder{rc: rc} -} - -func (r *rewinder) Read(b []byte) (int, error) { - // On the first `Read()`, the contents of `rc` is read into a buffer `rs`. - // This buffer is used for all subsequent reads - if r.rs == nil { - buf, err := ioutil.ReadAll(r.rc) - if err != nil { - return 0, err - } - r.rc.Close() - - r.rs = bytes.NewReader(buf) - } - - return r.rs.Read(b) -} - -func (r *rewinder) Close() error { - // Rewind the buffer on `Close()` for the next call to `Read` - r.rs.Seek(0, io.SeekStart) - - return nil -} diff --git a/cmd/activator/handlers_test.go b/cmd/activator/handlers_test.go index ef44d819cc60..0fc6c71d9c19 100644 --- a/cmd/activator/handlers_test.go +++ b/cmd/activator/handlers_test.go @@ -13,7 +13,6 @@ limitations under the License. package main import ( - "bytes" "errors" "fmt" "io" @@ -24,6 +23,7 @@ import ( "strconv" "testing" + "github.com/knative/serving/cmd/util" "github.com/knative/serving/pkg/activator" "github.com/knative/serving/pkg/controller" "go.uber.org/zap" @@ -110,11 +110,17 @@ func TestActivationHandler(t *testing.T) { for _, e := range examples { t.Run(e.label, func(t *testing.T) { +<<<<<<< HEAD rt := roundTripperFunc(func(r *http.Request) (*http.Response, error) { if r.Host != "" { t.Errorf("Unexpected request host. Want %q, got %q", "", r.Host) } +||||||| merged common ancestors + rt := roundTripperFunc(func(r *http.Request) (*http.Response, error) { +======= + rt := util.RoundTripperFunc(func(r *http.Request) (*http.Response, error) { +>>>>>>> Move non-activator specific components from cmd/activator to cmd/util if e.wantErr != nil { return nil, e.wantErr } @@ -122,7 +128,7 @@ func TestActivationHandler(t *testing.T) { return http.DefaultTransport.RoundTrip(r) }) - handler := newActivationHandler(act, rt, logger) + handler := NewActivationHandler(act, rt, logger) resp := httptest.NewRecorder() @@ -143,96 +149,3 @@ func TestActivationHandler(t *testing.T) { }) } } - -func TestUploadHandler(t *testing.T) { - payload := "SAMPLE PAYLOAD" - - examples := []struct { - label string - maxUpload int - status int - }{ - { - label: "under", - maxUpload: len(payload) + 1, - status: http.StatusOK, - }, - { - label: "equal", - maxUpload: len(payload), - status: http.StatusOK, - }, - { - label: "over", - maxUpload: len(payload) - 1, - status: http.StatusRequestEntityTooLarge, - }, - } - - for _, e := range examples { - t.Run(e.label, func(t *testing.T) { - baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - b1, _ := ioutil.ReadAll(r.Body) - r.Body.Close() - - b2, _ := ioutil.ReadAll(r.Body) - r.Body.Close() - - if string(b1) != payload || string(b2) != payload { - t.Errorf("Expected request body to be rereadable. Want %q, got %q and %q.", payload, b1, b2) - } - }) - handler := newUploadHandler(baseHandler, int64(e.maxUpload)) - - resp := httptest.NewRecorder() - req := httptest.NewRequest("POST", "http://example.com", bytes.NewBufferString(payload)) - - handler.ServeHTTP(resp, req) - - if resp.Code != e.status { - t.Errorf("Unexpected response status for payload %q. Want %d, got %d", payload, e.status, resp.Code) - } - }) - } -} - -type readCloser struct { - io.Reader - closed bool -} - -func (rc *readCloser) Close() error { - rc.closed = true - - return nil -} - -func TestRewinder(t *testing.T) { - str := "test string" - rc := &readCloser{bytes.NewBufferString(str), false} - rewinder := newRewinder(rc) - - b1, err := ioutil.ReadAll(rewinder) - if err != nil { - t.Errorf("Unexpected error reading b1: %v", err) - } - rewinder.Close() - - b2, err := ioutil.ReadAll(rewinder) - if err != nil { - t.Errorf("Unexpected error reading b2: %v", err) - } - rewinder.Close() - - if string(b1) != str { - t.Errorf("Unexpected str b1. Want %q, got %q", str, b1) - } - - if string(b2) != str { - t.Errorf("Unexpected str b2. Want %q, got %q", str, b2) - } - - if !rc.closed { - t.Errorf("Expected ReadCloser to be closed") - } -} diff --git a/cmd/activator/main.go b/cmd/activator/main.go index 0262e3d36ec7..0a9ad611f18f 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -26,11 +26,11 @@ import ( "github.com/knative/pkg/configmap" "github.com/knative/pkg/signals" + "github.com/knative/serving/cmd/util" "github.com/knative/serving/pkg/activator" clientset "github.com/knative/serving/pkg/client/clientset/versioned" "github.com/knative/serving/pkg/configmap" - h2cutil "github.com/knative/serving/pkg/h2c" "github.com/knative/serving/pkg/logging" "github.com/knative/serving/pkg/system" "github.com/knative/serving/third_party/h2c" @@ -99,8 +99,9 @@ func main() { // Retry on 503's for up to 60 seconds. The reason is there is // a small delay for k8s to include the ready IP in service. // https://github.com/knative/serving/issues/660#issuecomment-384062553 - rt := newHttpRoundTripper(http.DefaultTransport, h2cutil.DefaultTransport) - rt = newRetryRoundTripper(rt, logger, linearRetryer(retryInterval, maxRetries), retry503) + shouldRetry := util.ShouldRetryStatus(http.StatusServiceUnavailable) + retrier := util.LinearRetryer(retryInterval, maxRetries) + rt := util.NewRetryRoundTripper(util.AutoTransport, logger, retrier, shouldRetry) ah := newActivationHandler(a, rt, logger) ah = newUploadHandler(ah, defaultMaxUploadBytes) diff --git a/cmd/util/handlers.go b/cmd/util/handlers.go new file mode 100644 index 000000000000..f70e3a06153a --- /dev/null +++ b/cmd/util/handlers.go @@ -0,0 +1,46 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "net/http" +) + +// uploadHandler wraps the provided handler with a request body that supports +// re-reading and prevents uploads larger than `maxUploadBytes` +type uploadHandler struct { + http.Handler + MaxUploadBytes int64 +} + +func NewUploadHandler(h http.Handler, maxUploadBytes int64) http.Handler { + return &uploadHandler{ + Handler: h, + MaxUploadBytes: maxUploadBytes, + } +} + +func (h *uploadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.ContentLength > h.MaxUploadBytes { + w.WriteHeader(http.StatusRequestEntityTooLarge) + return + } + + // The request body cannot be read multiple times for retries. + // The workaround is to clone the request body into a byte reader + // so the body can be read multiple times. + r.Body = NewRewinder(r.Body) + + h.Handler.ServeHTTP(w, r) +} diff --git a/cmd/util/handlers_test.go b/cmd/util/handlers_test.go new file mode 100644 index 000000000000..3f9f3958aab5 --- /dev/null +++ b/cmd/util/handlers_test.go @@ -0,0 +1,73 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package util + +import ( + "bytes" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" +) + +func TestUploadHandler(t *testing.T) { + payload := "SAMPLE PAYLOAD" + + examples := []struct { + label string + maxUpload int + status int + }{ + { + label: "under", + maxUpload: len(payload) + 1, + status: http.StatusOK, + }, + { + label: "equal", + maxUpload: len(payload), + status: http.StatusOK, + }, + { + label: "over", + maxUpload: len(payload) - 1, + status: http.StatusRequestEntityTooLarge, + }, + } + + for _, e := range examples { + t.Run(e.label, func(t *testing.T) { + baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b1, _ := ioutil.ReadAll(r.Body) + r.Body.Close() + + b2, _ := ioutil.ReadAll(r.Body) + r.Body.Close() + + if string(b1) != payload || string(b2) != payload { + t.Errorf("Expected request body to be rereadable. Want %q, got %q and %q.", payload, b1, b2) + } + }) + handler := NewUploadHandler(baseHandler, int64(e.maxUpload)) + + resp := httptest.NewRecorder() + req := httptest.NewRequest("POST", "http://example.com", bytes.NewBufferString(payload)) + + handler.ServeHTTP(resp, req) + + if resp.Code != e.status { + t.Errorf("Unexpected response status for payload %q. Want %d, got %d", payload, e.status, resp.Code) + } + }) + } +} diff --git a/cmd/util/io.go b/cmd/util/io.go new file mode 100644 index 000000000000..cdfc6cddf78d --- /dev/null +++ b/cmd/util/io.go @@ -0,0 +1,64 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "bytes" + "io" + "io/ioutil" +) + +type SpyCloser struct { + io.Reader + Closed bool +} + +func (sc *SpyCloser) Close() error { + sc.Closed = true + + return nil +} + +type rewinder struct { + rc io.ReadCloser + rs io.ReadSeeker +} + +// rewinder wraps a single-use `ReadCloser` into a `ReadCloser` that can be read multiple times +func NewRewinder(rc io.ReadCloser) io.ReadCloser { + return &rewinder{rc: rc} +} + +func (r *rewinder) Read(b []byte) (int, error) { + // On the first `Read()`, the contents of `rc` is read into a buffer `rs`. + // This buffer is used for all subsequent reads + if r.rs == nil { + buf, err := ioutil.ReadAll(r.rc) + if err != nil { + return 0, err + } + r.rc.Close() + + r.rs = bytes.NewReader(buf) + } + + return r.rs.Read(b) +} + +func (r *rewinder) Close() error { + // Rewind the buffer on `Close()` for the next call to `Read` + r.rs.Seek(0, io.SeekStart) + + return nil +} diff --git a/cmd/util/io_test.go b/cmd/util/io_test.go new file mode 100644 index 000000000000..313a9571668a --- /dev/null +++ b/cmd/util/io_test.go @@ -0,0 +1,49 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package util + +import ( + "bytes" + "io/ioutil" + "testing" +) + +func TestRewinder(t *testing.T) { + str := "test string" + rc := &SpyCloser{Reader: bytes.NewBufferString(str)} + rewinder := NewRewinder(rc) + + b1, err := ioutil.ReadAll(rewinder) + if err != nil { + t.Errorf("Unexpected error reading b1: %v", err) + } + rewinder.Close() + + b2, err := ioutil.ReadAll(rewinder) + if err != nil { + t.Errorf("Unexpected error reading b2: %v", err) + } + rewinder.Close() + + if string(b1) != str { + t.Errorf("Unexpected str b1. Want %q, got %q", str, b1) + } + + if string(b2) != str { + t.Errorf("Unexpected str b2. Want %q, got %q", str, b2) + } + + if !rc.Closed { + t.Errorf("Expected ReadCloser to be closed") + } +} diff --git a/cmd/activator/round_trippers.go b/cmd/util/transports.go similarity index 71% rename from cmd/activator/round_trippers.go rename to cmd/util/transports.go index e5f5d2a04d41..5c8fa150ee6a 100644 --- a/cmd/activator/round_trippers.go +++ b/cmd/util/transports.go @@ -11,15 +11,22 @@ See the License for the specific language governing permissions and limitations under the License. */ -package main +package util import ( "net/http" "time" + h2cutil "github.com/knative/serving/pkg/h2c" "go.uber.org/zap" ) +type RoundTripperFunc func(*http.Request) (*http.Response, error) + +func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return rt(r) +} + // httpRoundTripper will use the appropriate transport for the request's http protocol version type httpRoundTripper struct { v1 http.RoundTripper @@ -39,9 +46,12 @@ func (rt *httpRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { return t.RoundTrip(r) } -type retryer func(func() bool) int +// AutoTransport uses h2c for HTTPv2 requests and falls back to `http.DefaultTransport` for all others +var AutoTransport = newHttpRoundTripper(http.DefaultTransport, h2cutil.DefaultTransport) -func linearRetryer(interval time.Duration, maxRetries int) retryer { +type Retryer func(func() bool) int + +func LinearRetryer(interval time.Duration, maxRetries int) Retryer { return func(action func() bool) (retries int) { for retries = 1; !action() && retries < maxRetries; retries++ { time.Sleep(interval) @@ -50,21 +60,23 @@ func linearRetryer(interval time.Duration, maxRetries int) retryer { } } -type shouldRetryFunc func(*http.Response) bool +type ShouldRetryFunc func(*http.Response) bool -func retry503(resp *http.Response) bool { - return resp.StatusCode == http.StatusServiceUnavailable +func ShouldRetryStatus(status int) ShouldRetryFunc { + return func(resp *http.Response) bool { + return resp.StatusCode == status + } } type retryRoundTripper struct { logger *zap.SugaredLogger transport http.RoundTripper - retry retryer - shouldRetry shouldRetryFunc + retry Retryer + shouldRetry ShouldRetryFunc } // retryRoundTripper retries a request on error or `shouldRetry` condition, using the given `retry` strategy -func newRetryRoundTripper(rt http.RoundTripper, l *zap.SugaredLogger, r retryer, sr shouldRetryFunc) http.RoundTripper { +func NewRetryRoundTripper(rt http.RoundTripper, l *zap.SugaredLogger, r Retryer, sr ShouldRetryFunc) http.RoundTripper { return &retryRoundTripper{ logger: l, transport: rt, diff --git a/cmd/activator/round_trippers_test.go b/cmd/util/transports_test.go similarity index 89% rename from cmd/activator/round_trippers_test.go rename to cmd/util/transports_test.go index a05301d1c5c0..08899b7bc09f 100644 --- a/cmd/activator/round_trippers_test.go +++ b/cmd/util/transports_test.go @@ -10,7 +10,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package main +package util import ( "errors" @@ -21,16 +21,10 @@ import ( "go.uber.org/zap" ) -type roundTripperFunc func(*http.Request) (*http.Response, error) - -func (rt roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { - return rt(r) -} - func TestHttpRoundTripper(t *testing.T) { wants := map[string]bool{} frt := func(key string) http.RoundTripper { - return roundTripperFunc(func(r *http.Request) (*http.Response, error) { + return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { wants[key] = true return nil, nil @@ -83,7 +77,7 @@ func TestRetryRoundTripper(t *testing.T) { badStatus := 500 resp := func(status int) *http.Response { - return &http.Response{StatusCode: status, Body: &readCloser{}} + return &http.Response{StatusCode: status, Body: &SpyCloser{}} } someErr := errors.New("some error") @@ -125,7 +119,7 @@ func TestRetryRoundTripper(t *testing.T) { for _, e := range examples { t.Run(e.label, func(t *testing.T) { - transport := roundTripperFunc(func(r *http.Request) (*http.Response, error) { + transport := RoundTripperFunc(func(r *http.Request) (*http.Response, error) { return e.wantResp, e.wantErr }) @@ -140,7 +134,7 @@ func TestRetryRoundTripper(t *testing.T) { return 1 } - rt := newRetryRoundTripper(transport, logger, retry, shouldRetry) + rt := NewRetryRoundTripper(transport, logger, retry, shouldRetry) gotResp, gotErr := rt.RoundTrip(req) @@ -152,7 +146,7 @@ func TestRetryRoundTripper(t *testing.T) { t.Errorf("Unexpected error. Want %v, got %v", e.wantErr, gotErr) } - if e.wantBodyClosed && !e.wantResp.Body.(*readCloser).closed { + if e.wantBodyClosed && !e.wantResp.Body.(*SpyCloser).Closed { t.Errorf("Expected response body to be closed.") } }) @@ -221,7 +215,7 @@ func TestLinearRetry(t *testing.T) { return ok } - lr := linearRetryer(e.interval, e.maxRetries) + lr := LinearRetryer(e.interval, e.maxRetries) reported := lr(a) From 503aea894da7ec6793d9c03d7d9f417cbdab8123 Mon Sep 17 00:00:00 2001 From: Tanzeeb Khalili Date: Wed, 1 Aug 2018 17:34:08 -0400 Subject: [PATCH 11/13] Move activation handler into pkg/activator --- .../handlers.go => pkg/activator/handler.go | 7 +++---- .../activator/handler_test.go | 21 +++++++++---------- 2 files changed, 13 insertions(+), 15 deletions(-) rename cmd/activator/handlers.go => pkg/activator/handler.go (89%) rename cmd/activator/handlers_test.go => pkg/activator/handler_test.go (85%) diff --git a/cmd/activator/handlers.go b/pkg/activator/handler.go similarity index 89% rename from cmd/activator/handlers.go rename to pkg/activator/handler.go index 9c3ef73d7086..a7972261ab30 100644 --- a/cmd/activator/handlers.go +++ b/pkg/activator/handler.go @@ -11,7 +11,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package main +package activator import ( "fmt" @@ -19,20 +19,19 @@ import ( "net/http/httputil" "net/url" - "github.com/knative/serving/pkg/activator" "github.com/knative/serving/pkg/controller" "go.uber.org/zap" ) type activationHandler struct { - activator activator.Activator + activator Activator logger *zap.SugaredLogger transport http.RoundTripper } // activationHandler will proxy a request to the active endpoint for the specified revision, // using the provided transport -func NewActivationHandler(a activator.Activator, rt http.RoundTripper, l *zap.SugaredLogger) http.Handler { +func NewActivationHandler(a Activator, rt http.RoundTripper, l *zap.SugaredLogger) http.Handler { return &activationHandler{activator: a, transport: rt, logger: l} } diff --git a/cmd/activator/handlers_test.go b/pkg/activator/handler_test.go similarity index 85% rename from cmd/activator/handlers_test.go rename to pkg/activator/handler_test.go index 0fc6c71d9c19..db3edaf05c6e 100644 --- a/cmd/activator/handlers_test.go +++ b/pkg/activator/handler_test.go @@ -10,7 +10,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package main +package activator import ( "errors" @@ -24,38 +24,37 @@ import ( "testing" "github.com/knative/serving/cmd/util" - "github.com/knative/serving/pkg/activator" "github.com/knative/serving/pkg/controller" "go.uber.org/zap" ) -type fakeActivator struct { - endpoint activator.Endpoint +type stubActivator struct { + endpoint Endpoint namespace string name string } -func newFakeActivator(namespace string, name string, server *httptest.Server) activator.Activator { +func newStubActivator(namespace string, name string, server *httptest.Server) Activator { url, _ := url.Parse(server.URL) host := url.Hostname() port, _ := strconv.Atoi(url.Port()) - return &fakeActivator{ - endpoint: activator.Endpoint{FQDN: host, Port: int32(port)}, + return &stubActivator{ + endpoint: Endpoint{FQDN: host, Port: int32(port)}, namespace: namespace, name: name, } } -func (fa *fakeActivator) ActiveEndpoint(namespace, name string) (activator.Endpoint, activator.Status, error) { +func (fa *stubActivator) ActiveEndpoint(namespace, name string) (Endpoint, Status, error) { if namespace == fa.namespace && name == fa.name { return fa.endpoint, http.StatusOK, nil } - return activator.Endpoint{}, http.StatusNotFound, errors.New("not found!") + return Endpoint{}, http.StatusNotFound, errors.New("not found!") } -func (fa *fakeActivator) Shutdown() { +func (fa *stubActivator) Shutdown() { } func TestActivationHandler(t *testing.T) { @@ -72,7 +71,7 @@ func TestActivationHandler(t *testing.T) { ) defer server.Close() - act := newFakeActivator("real-namespace", "real-name", server) + act := newStubActivator("real-namespace", "real-name", server) examples := []struct { label string From 79fccdbeb5cc9e11e97aa0fcbf467a20014850ec Mon Sep 17 00:00:00 2001 From: Tanzeeb Khalili Date: Wed, 1 Aug 2018 19:31:39 -0400 Subject: [PATCH 12/13] Add a more fluent interface to RetryRoundTripper --- cmd/activator/main.go | 6 +-- cmd/util/transports.go | 75 ++++++++++++++++++++----------------- cmd/util/transports_test.go | 71 ++++++++++++++++++++++------------- 3 files changed, 87 insertions(+), 65 deletions(-) diff --git a/cmd/activator/main.go b/cmd/activator/main.go index 0a9ad611f18f..a9f863f2e09b 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -99,9 +99,9 @@ func main() { // Retry on 503's for up to 60 seconds. The reason is there is // a small delay for k8s to include the ready IP in service. // https://github.com/knative/serving/issues/660#issuecomment-384062553 - shouldRetry := util.ShouldRetryStatus(http.StatusServiceUnavailable) - retrier := util.LinearRetryer(retryInterval, maxRetries) - rt := util.NewRetryRoundTripper(util.AutoTransport, logger, retrier, shouldRetry) + shouldRetry := util.RetryStatus(http.StatusServiceUnavailable) + retryer := util.NewLinearRetryer(retryInterval, maxRetries) + rt := util.NewRetryRoundTripper(util.AutoTransport, logger, retryer, shouldRetry) ah := newActivationHandler(a, rt, logger) ah = newUploadHandler(ah, defaultMaxUploadBytes) diff --git a/cmd/util/transports.go b/cmd/util/transports.go index 5c8fa150ee6a..dea0bb006caa 100644 --- a/cmd/util/transports.go +++ b/cmd/util/transports.go @@ -27,66 +27,69 @@ func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { return rt(r) } -// httpRoundTripper will use the appropriate transport for the request's http protocol version -type httpRoundTripper struct { - v1 http.RoundTripper - v2 http.RoundTripper -} +// HttpTransport will use the appropriate transport for the request's http protocol version +func NewHttpTransport(v1 http.RoundTripper, v2 http.RoundTripper) http.RoundTripper { + return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + var t http.RoundTripper = v1 + if r.ProtoMajor == 2 { + t = v2 + } -func newHttpRoundTripper(v1 http.RoundTripper, v2 http.RoundTripper) http.RoundTripper { - return &httpRoundTripper{v1: v1, v2: v2} + return t.RoundTrip(r) + }) } -func (rt *httpRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { - var t http.RoundTripper = rt.v1 - if r.ProtoMajor == 2 { - t = rt.v2 - } +// AutoTransport uses h2c for HTTPv2 requests and falls back to `http.DefaultTransport` for all others +var AutoTransport = NewHttpTransport(http.DefaultTransport, h2cutil.DefaultTransport) - return t.RoundTrip(r) +type Retryer interface { + Retry(func() bool) int } -// AutoTransport uses h2c for HTTPv2 requests and falls back to `http.DefaultTransport` for all others -var AutoTransport = newHttpRoundTripper(http.DefaultTransport, h2cutil.DefaultTransport) +type RetryerFunc func(func() bool) int -type Retryer func(func() bool) int +func (r RetryerFunc) Retry(f func() bool) int { + return r(f) +} -func LinearRetryer(interval time.Duration, maxRetries int) Retryer { - return func(action func() bool) (retries int) { +// LinearRetryer will retry `action` up to `maxRetries` times with `interval` delay between retries +func NewLinearRetryer(interval time.Duration, maxRetries int) Retryer { + return RetryerFunc(func(action func() bool) (retries int) { for retries = 1; !action() && retries < maxRetries; retries++ { time.Sleep(interval) } return - } + }) } -type ShouldRetryFunc func(*http.Response) bool +type RetryCond func(*http.Response) bool -func ShouldRetryStatus(status int) ShouldRetryFunc { +// RetryStatus will filter responses matching `status` +func RetryStatus(status int) RetryCond { return func(resp *http.Response) bool { return resp.StatusCode == status } } type retryRoundTripper struct { - logger *zap.SugaredLogger - transport http.RoundTripper - retry Retryer - shouldRetry ShouldRetryFunc + logger *zap.SugaredLogger + transport http.RoundTripper + retryer Retryer + retryConds []RetryCond } -// retryRoundTripper retries a request on error or `shouldRetry` condition, using the given `retry` strategy -func NewRetryRoundTripper(rt http.RoundTripper, l *zap.SugaredLogger, r Retryer, sr ShouldRetryFunc) http.RoundTripper { +// RetryRoundTripper retries a request on error or retry condition, using the given `retry` strategy +func NewRetryRoundTripper(rt http.RoundTripper, l *zap.SugaredLogger, r Retryer, sr ...RetryCond) http.RoundTripper { return &retryRoundTripper{ - logger: l, - transport: rt, - retry: r, - shouldRetry: sr, + logger: l, + transport: rt, + retryer: r, + retryConds: sr, } } func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (resp *http.Response, err error) { - attempts := rrt.retry(func() bool { + attempts := rrt.retryer.Retry(func() bool { resp, err = rrt.transport.RoundTrip(r) if err != nil { @@ -94,9 +97,11 @@ func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (resp *http.Response, e return true } - if rrt.shouldRetry(resp) { - resp.Body.Close() - return true + for _, retryCond := range rrt.retryConds { + if retryCond(resp) { + resp.Body.Close() + return true + } } return false diff --git a/cmd/util/transports_test.go b/cmd/util/transports_test.go index 08899b7bc09f..27e503978f38 100644 --- a/cmd/util/transports_test.go +++ b/cmd/util/transports_test.go @@ -31,7 +31,7 @@ func TestHttpRoundTripper(t *testing.T) { }) } - rt := newHttpRoundTripper(frt("v1"), frt("v2")) + rt := NewHttpTransport(frt("v1"), frt("v2")) examples := []struct { label string @@ -74,7 +74,8 @@ func TestRetryRoundTripper(t *testing.T) { req := &http.Request{} goodStatus := 200 - badStatus := 500 + badStatus1 := 500 + badStatus2 := 400 resp := func(status int) *http.Response { return &http.Response{StatusCode: status, Body: &SpyCloser{}} @@ -83,35 +84,53 @@ func TestRetryRoundTripper(t *testing.T) { someErr := errors.New("some error") logger := zap.NewExample().Sugar() - shouldRetry := func(resp *http.Response) bool { - return resp.StatusCode == badStatus - } + conditions := []RetryCond{RetryStatus(badStatus1), RetryStatus(badStatus2)} examples := []struct { label string - wantResp *http.Response - wantErr error + resp *http.Response + err error + cond []RetryCond wantRetry bool wantBodyClosed bool }{ { label: "no retry", - wantResp: resp(goodStatus), - wantErr: nil, + resp: resp(goodStatus), + err: nil, + cond: conditions, + wantRetry: false, + wantBodyClosed: false, + }, + { + label: "no conditions", + resp: resp(badStatus1), + err: nil, + cond: []RetryCond{}, wantRetry: false, wantBodyClosed: false, }, { label: "retry on error", - wantResp: nil, - wantErr: someErr, + resp: nil, + err: someErr, + cond: conditions, wantRetry: true, wantBodyClosed: false, }, { - label: "retry on condition", - wantResp: resp(badStatus), - wantErr: nil, + label: "retry on condition 1", + resp: resp(badStatus1), + err: nil, + cond: conditions, + wantRetry: true, + wantBodyClosed: true, + }, + { + label: "retry on condition 2", + resp: resp(badStatus2), + err: nil, + cond: conditions, wantRetry: true, wantBodyClosed: true, }, @@ -120,33 +139,31 @@ func TestRetryRoundTripper(t *testing.T) { for _, e := range examples { t.Run(e.label, func(t *testing.T) { transport := RoundTripperFunc(func(r *http.Request) (*http.Response, error) { - return e.wantResp, e.wantErr + return e.resp, e.err }) - retry := func(a func() bool) int { + retry := RetryerFunc(func(a func() bool) int { if a() { if !e.wantRetry { t.Errorf("Unexpected retry.") } - } - return 1 - } + }) - rt := NewRetryRoundTripper(transport, logger, retry, shouldRetry) + rt := NewRetryRoundTripper(transport, logger, retry, e.cond...) gotResp, gotErr := rt.RoundTrip(req) - if gotResp != e.wantResp { - t.Errorf("Unexpected response. Want %v, got %v", e.wantResp, gotResp) + if gotResp != e.resp { + t.Errorf("Unexpected response. Want %v, got %v", e.resp, gotResp) } - if gotErr != e.wantErr { - t.Errorf("Unexpected error. Want %v, got %v", e.wantErr, gotErr) + if gotErr != e.err { + t.Errorf("Unexpected error. Want %v, got %v", e.err, gotErr) } - if e.wantBodyClosed && !e.wantResp.Body.(*SpyCloser).Closed { + if e.wantBodyClosed && !e.resp.Body.(*SpyCloser).Closed { t.Errorf("Expected response body to be closed.") } }) @@ -215,9 +232,9 @@ func TestLinearRetry(t *testing.T) { return ok } - lr := LinearRetryer(e.interval, e.maxRetries) + lr := NewLinearRetryer(e.interval, e.maxRetries) - reported := lr(a) + reported := lr.Retry(a) if got != e.wantRetries { t.Errorf("Unexpected retries. Want %d, got %d", e.wantRetries, got) From 41287bac7a4423171cd1f920850d3533d59d0ebe Mon Sep 17 00:00:00 2001 From: Dave Protasowski Date: Thu, 9 Aug 2018 09:36:04 -0400 Subject: [PATCH 13/13] Rebase changes and further refactor stats reporting Co-authored-by: Nader Ziada Co-authored-by: David Protasowski --- cmd/activator/main.go | 42 ++--- cmd/util/handlers.go | 46 ----- pkg/activator/activator.go | 3 +- pkg/activator/handler/coverage.out | 12 ++ .../handler/enforce_length_handler.go | 33 ++++ .../handler/enforce_length_handler_test.go | 17 +- pkg/activator/{ => handler}/handler.go | 34 ++-- pkg/activator/{ => handler}/handler_test.go | 42 ++--- pkg/activator/handler/reporting_handler.go | 65 +++++++ .../handler/reporting_handler_test.go | 159 ++++++++++++++++++ pkg/activator/util/coverage.out | 34 ++++ pkg/activator/util/io_test.go | 35 ++++ pkg/activator/util/retryer.go | 38 +++++ pkg/activator/util/retryer_test.go | 95 +++++++++++ .../io.go => pkg/activator/util/rewinder.go | 11 -- .../activator/util/rewinder_test.go | 2 +- {cmd => pkg/activator}/util/transports.go | 46 +++-- .../activator}/util/transports_test.go | 150 ++++++----------- 18 files changed, 607 insertions(+), 257 deletions(-) delete mode 100644 cmd/util/handlers.go create mode 100644 pkg/activator/handler/coverage.out create mode 100644 pkg/activator/handler/enforce_length_handler.go rename cmd/util/handlers_test.go => pkg/activator/handler/enforce_length_handler_test.go (78%) rename pkg/activator/{ => handler}/handler.go (58%) rename pkg/activator/{ => handler}/handler_test.go (75%) create mode 100644 pkg/activator/handler/reporting_handler.go create mode 100644 pkg/activator/handler/reporting_handler_test.go create mode 100644 pkg/activator/util/coverage.out create mode 100644 pkg/activator/util/io_test.go create mode 100644 pkg/activator/util/retryer.go create mode 100644 pkg/activator/util/retryer_test.go rename cmd/util/io.go => pkg/activator/util/rewinder.go (91%) rename cmd/util/io_test.go => pkg/activator/util/rewinder_test.go (95%) rename {cmd => pkg/activator}/util/transports.go (71%) rename {cmd => pkg/activator}/util/transports_test.go (52%) diff --git a/cmd/activator/main.go b/cmd/activator/main.go index a9f863f2e09b..be4a39dcfcdc 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -26,10 +26,10 @@ import ( "github.com/knative/pkg/configmap" "github.com/knative/pkg/signals" - "github.com/knative/serving/cmd/util" "github.com/knative/serving/pkg/activator" + activatorhandler "github.com/knative/serving/pkg/activator/handler" + activatorutil "github.com/knative/serving/pkg/activator/util" clientset "github.com/knative/serving/pkg/client/clientset/versioned" - "github.com/knative/serving/pkg/configmap" "github.com/knative/serving/pkg/logging" "github.com/knative/serving/pkg/system" @@ -42,13 +42,10 @@ import ( ) const ( - maxUploadBytes = 32e6 // 32MB - same as app engine - maxRetry = 60 - retryInterval = 1 * time.Second - logLevelKey = "activator" - defaultMaxUploadBytes = 32e6 // 32MB - same as app engine - defaultMaxRetries = 60 - defaultRetryInterval = 1 * time.Second + maxUploadBytes = 32e6 // 32MB - same as app engine + maxRetries = 60 + retryInterval = 1 * time.Second + logLevelKey = "activator" ) func main() { @@ -94,17 +91,26 @@ func main() { a := activator.NewRevisionActivator(kubeClient, servingClient, logger, reporter) a = activator.NewDedupingActivator(a) - ah := &activationHandler{a, logger} // Retry on 503's for up to 60 seconds. The reason is there is // a small delay for k8s to include the ready IP in service. // https://github.com/knative/serving/issues/660#issuecomment-384062553 - shouldRetry := util.RetryStatus(http.StatusServiceUnavailable) - retryer := util.NewLinearRetryer(retryInterval, maxRetries) - rt := util.NewRetryRoundTripper(util.AutoTransport, logger, retryer, shouldRetry) - - ah := newActivationHandler(a, rt, logger) - ah = newUploadHandler(ah, defaultMaxUploadBytes) + shouldRetry := activatorutil.RetryStatus(http.StatusServiceUnavailable) + retryer := activatorutil.NewLinearRetryer(retryInterval, maxRetries) + + rt := activatorutil.NewRetryRoundTripper(activatorutil.AutoTransport, logger, retryer, shouldRetry) + + ah := &activatorhandler.ReportingHTTPHandler{ + Reporter: reporter, + NextHandler: &activatorhandler.EnforceMaxContentLengthHandler{ + MaxContentLengthBytes: maxUploadBytes, + NextHandler: &activatorhandler.ActivationHandler{ + Activator: a, + Transport: rt, + Logger: logger, + }, + }, + } // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() @@ -122,9 +128,7 @@ func main() { // Start the endpoint for Prometheus scraping mux := http.NewServeMux() - mux.HandleFunc("/", ah.handler) + mux.HandleFunc("/", ah.ServeHTTP) mux.Handle("/metrics", promExporter) h2c.ListenAndServe(":8080", mux) - http.Handle("/", ah) - h2c.ListenAndServe(":8080", nil) } diff --git a/cmd/util/handlers.go b/cmd/util/handlers.go deleted file mode 100644 index f70e3a06153a..000000000000 --- a/cmd/util/handlers.go +++ /dev/null @@ -1,46 +0,0 @@ -/* -Copyright 2018 The Knative Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "net/http" -) - -// uploadHandler wraps the provided handler with a request body that supports -// re-reading and prevents uploads larger than `maxUploadBytes` -type uploadHandler struct { - http.Handler - MaxUploadBytes int64 -} - -func NewUploadHandler(h http.Handler, maxUploadBytes int64) http.Handler { - return &uploadHandler{ - Handler: h, - MaxUploadBytes: maxUploadBytes, - } -} - -func (h *uploadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if r.ContentLength > h.MaxUploadBytes { - w.WriteHeader(http.StatusRequestEntityTooLarge) - return - } - - // The request body cannot be read multiple times for retries. - // The workaround is to clone the request body into a byte reader - // so the body can be read multiple times. - r.Body = NewRewinder(r.Body) - - h.Handler.ServeHTTP(w, r) -} diff --git a/pkg/activator/activator.go b/pkg/activator/activator.go index 2ff3d28e23de..469131b034a4 100644 --- a/pkg/activator/activator.go +++ b/pkg/activator/activator.go @@ -17,7 +17,8 @@ package activator const ( // The name of the activator service. - K8sServiceName = "activator-service" + K8sServiceName = "activator-service" + ResponseCountHTTPHeader = "X-Activator-Num-Retries" ) // Status is an HTTP status code. diff --git a/pkg/activator/handler/coverage.out b/pkg/activator/handler/coverage.out new file mode 100644 index 000000000000..18bcbc777ce2 --- /dev/null +++ b/pkg/activator/handler/coverage.out @@ -0,0 +1,12 @@ +mode: set +github.com/knative/serving/pkg/activator/handler/enforce_length_handler.go:26.92,27.47 1 1 +github.com/knative/serving/pkg/activator/handler/enforce_length_handler.go:32.2,32.31 1 1 +github.com/knative/serving/pkg/activator/handler/enforce_length_handler.go:27.47,30.3 2 1 +github.com/knative/serving/pkg/activator/handler/handler.go:35.90,37.2 1 1 +github.com/knative/serving/pkg/activator/handler/handler.go:39.79,46.16 5 1 +github.com/knative/serving/pkg/activator/handler/handler.go:54.2,66.23 5 1 +github.com/knative/serving/pkg/activator/handler/handler.go:46.16,52.3 4 1 +github.com/knative/serving/pkg/activator/handler/reporting_handler.go:30.82,46.87 9 1 +github.com/knative/serving/pkg/activator/handler/reporting_handler.go:51.2,51.79 1 1 +github.com/knative/serving/pkg/activator/handler/reporting_handler.go:46.87,49.3 2 1 +github.com/knative/serving/pkg/activator/handler/reporting_handler.go:59.53,63.2 2 1 diff --git a/pkg/activator/handler/enforce_length_handler.go b/pkg/activator/handler/enforce_length_handler.go new file mode 100644 index 000000000000..b4b2da84bae3 --- /dev/null +++ b/pkg/activator/handler/enforce_length_handler.go @@ -0,0 +1,33 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "net/http" +) + +// EnforceMaxContentLengthHandler prevents uploads larger than `MaxContentLengthBytes` +type EnforceMaxContentLengthHandler struct { + NextHandler http.Handler + MaxContentLengthBytes int64 +} + +func (h *EnforceMaxContentLengthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.ContentLength > h.MaxContentLengthBytes { + w.WriteHeader(http.StatusRequestEntityTooLarge) + return + } + + h.NextHandler.ServeHTTP(w, r) +} diff --git a/cmd/util/handlers_test.go b/pkg/activator/handler/enforce_length_handler_test.go similarity index 78% rename from cmd/util/handlers_test.go rename to pkg/activator/handler/enforce_length_handler_test.go index 3f9f3958aab5..b579bc3a54b9 100644 --- a/cmd/util/handlers_test.go +++ b/pkg/activator/handler/enforce_length_handler_test.go @@ -10,17 +10,16 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package util +package handler import ( "bytes" - "io/ioutil" "net/http" "net/http/httptest" "testing" ) -func TestUploadHandler(t *testing.T) { +func TestEnforceMaxContentLengthHandler(t *testing.T) { payload := "SAMPLE PAYLOAD" examples := []struct { @@ -48,17 +47,9 @@ func TestUploadHandler(t *testing.T) { for _, e := range examples { t.Run(e.label, func(t *testing.T) { baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - b1, _ := ioutil.ReadAll(r.Body) - r.Body.Close() - - b2, _ := ioutil.ReadAll(r.Body) - r.Body.Close() - - if string(b1) != payload || string(b2) != payload { - t.Errorf("Expected request body to be rereadable. Want %q, got %q and %q.", payload, b1, b2) - } + w.WriteHeader(http.StatusOK) }) - handler := NewUploadHandler(baseHandler, int64(e.maxUpload)) + handler := EnforceMaxContentLengthHandler{NextHandler: baseHandler, MaxContentLengthBytes: int64(e.maxUpload)} resp := httptest.NewRecorder() req := httptest.NewRequest("POST", "http://example.com", bytes.NewBufferString(payload)) diff --git a/pkg/activator/handler.go b/pkg/activator/handler/handler.go similarity index 58% rename from pkg/activator/handler.go rename to pkg/activator/handler/handler.go index a7972261ab30..d79464f0976b 100644 --- a/pkg/activator/handler.go +++ b/pkg/activator/handler/handler.go @@ -11,7 +11,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package activator +package handler import ( "fmt" @@ -19,33 +19,31 @@ import ( "net/http/httputil" "net/url" - "github.com/knative/serving/pkg/controller" + "github.com/knative/serving/pkg/activator" + "github.com/knative/serving/pkg/reconciler" "go.uber.org/zap" ) -type activationHandler struct { - activator Activator - logger *zap.SugaredLogger - transport http.RoundTripper +// ActivationHandler will wait for an active endpoint for a revision +// to be available before proxing the request +type ActivationHandler struct { + Activator activator.Activator + Logger *zap.SugaredLogger + Transport http.RoundTripper } -// activationHandler will proxy a request to the active endpoint for the specified revision, -// using the provided transport -func NewActivationHandler(a Activator, rt http.RoundTripper, l *zap.SugaredLogger) http.Handler { - return &activationHandler{activator: a, transport: rt, logger: l} -} +func (a *ActivationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + namespace := r.Header.Get(reconciler.GetRevisionHeaderNamespace()) + name := r.Header.Get(reconciler.GetRevisionHeaderName()) + config := r.Header.Get(reconciler.GetConfigurationHeader()) -func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - namespace := r.Header.Get(controller.GetRevisionHeaderNamespace()) - name := r.Header.Get(controller.GetRevisionHeaderName()) + endpoint, status, err := a.Activator.ActiveEndpoint(namespace, config, name) - endpoint, status, err := a.activator.ActiveEndpoint(namespace, name) if err != nil { msg := fmt.Sprintf("Error getting active endpoint: %v", err) - a.logger.Errorf(msg) + a.Logger.Errorf(msg) http.Error(w, msg, int(status)) - return } @@ -55,7 +53,7 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } proxy := httputil.NewSingleHostReverseProxy(target) - proxy.Transport = a.transport + proxy.Transport = a.Transport // TODO: Clear the host to avoid 404's. // https://github.com/knative/serving/issues/964 diff --git a/pkg/activator/handler_test.go b/pkg/activator/handler/handler_test.go similarity index 75% rename from pkg/activator/handler_test.go rename to pkg/activator/handler/handler_test.go index db3edaf05c6e..d95ce60aeadd 100644 --- a/pkg/activator/handler_test.go +++ b/pkg/activator/handler/handler_test.go @@ -10,7 +10,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package activator +package handler import ( "errors" @@ -23,43 +23,42 @@ import ( "strconv" "testing" - "github.com/knative/serving/cmd/util" - "github.com/knative/serving/pkg/controller" - "go.uber.org/zap" + . "github.com/knative/pkg/logging/testing" + "github.com/knative/serving/pkg/activator" + "github.com/knative/serving/pkg/activator/util" + "github.com/knative/serving/pkg/reconciler" ) type stubActivator struct { - endpoint Endpoint + endpoint activator.Endpoint namespace string name string } -func newStubActivator(namespace string, name string, server *httptest.Server) Activator { +func newStubActivator(namespace string, name string, server *httptest.Server) activator.Activator { url, _ := url.Parse(server.URL) host := url.Hostname() port, _ := strconv.Atoi(url.Port()) return &stubActivator{ - endpoint: Endpoint{FQDN: host, Port: int32(port)}, + endpoint: activator.Endpoint{FQDN: host, Port: int32(port)}, namespace: namespace, name: name, } } -func (fa *stubActivator) ActiveEndpoint(namespace, name string) (Endpoint, Status, error) { +func (fa *stubActivator) ActiveEndpoint(namespace, configuration, name string) (activator.Endpoint, activator.Status, error) { if namespace == fa.namespace && name == fa.name { return fa.endpoint, http.StatusOK, nil } - return Endpoint{}, http.StatusNotFound, errors.New("not found!") + return activator.Endpoint{}, http.StatusNotFound, errors.New("not found!") } func (fa *stubActivator) Shutdown() { } func TestActivationHandler(t *testing.T) { - logger := zap.NewExample().Sugar() - errMsg := func(msg string) string { return fmt.Sprintf("Error getting active endpoint: %v\n", msg) } @@ -109,17 +108,7 @@ func TestActivationHandler(t *testing.T) { for _, e := range examples { t.Run(e.label, func(t *testing.T) { -<<<<<<< HEAD - rt := roundTripperFunc(func(r *http.Request) (*http.Response, error) { - if r.Host != "" { - t.Errorf("Unexpected request host. Want %q, got %q", "", r.Host) - } - -||||||| merged common ancestors - rt := roundTripperFunc(func(r *http.Request) (*http.Response, error) { -======= rt := util.RoundTripperFunc(func(r *http.Request) (*http.Response, error) { ->>>>>>> Move non-activator specific components from cmd/activator to cmd/util if e.wantErr != nil { return nil, e.wantErr } @@ -127,13 +116,17 @@ func TestActivationHandler(t *testing.T) { return http.DefaultTransport.RoundTrip(r) }) - handler := NewActivationHandler(act, rt, logger) + handler := ActivationHandler{ + Activator: act, + Transport: rt, + Logger: TestLogger(t), + } resp := httptest.NewRecorder() req := httptest.NewRequest("POST", "http://example.com", nil) - req.Header.Set(controller.GetRevisionHeaderNamespace(), e.namespace) - req.Header.Set(controller.GetRevisionHeaderName(), e.name) + req.Header.Set(reconciler.GetRevisionHeaderNamespace(), e.namespace) + req.Header.Set(reconciler.GetRevisionHeaderName(), e.name) handler.ServeHTTP(resp, req) @@ -147,4 +140,5 @@ func TestActivationHandler(t *testing.T) { } }) } + } diff --git a/pkg/activator/handler/reporting_handler.go b/pkg/activator/handler/reporting_handler.go new file mode 100644 index 000000000000..727301f580c7 --- /dev/null +++ b/pkg/activator/handler/reporting_handler.go @@ -0,0 +1,65 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "net/http" + "strconv" + "time" + + "github.com/knative/serving/pkg/activator" + "github.com/knative/serving/pkg/reconciler" +) + +// ReportingHTTPHandler will forward request & response metrics +// to the `activator.StatsReporter`` +type ReportingHTTPHandler struct { + NextHandler http.Handler + Reporter activator.StatsReporter +} + +func (h *ReportingHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + namespace := r.Header.Get(reconciler.GetRevisionHeaderNamespace()) + name := r.Header.Get(reconciler.GetRevisionHeaderName()) + config := r.Header.Get(reconciler.GetConfigurationHeader()) + start := time.Now() + + capture := &statusCapture{ + ResponseWriter: w, + statusCode: http.StatusOK, + } + + h.NextHandler.ServeHTTP(capture, r) + + status := capture.statusCode + duration := time.Now().Sub(start) + + if numRetries := w.Header().Get(activator.ResponseCountHTTPHeader); numRetries != "" { + count, _ := strconv.Atoi(numRetries) + h.Reporter.ReportResponseCount(namespace, config, name, int(status), count, 1.0) + } + + h.Reporter.ReportResponseTime(namespace, config, name, int(status), duration) +} + +type statusCapture struct { + http.ResponseWriter + statusCode int +} + +func (s *statusCapture) WriteHeader(statusCode int) { + s.statusCode = statusCode + s.ResponseWriter.WriteHeader(statusCode) + +} diff --git a/pkg/activator/handler/reporting_handler_test.go b/pkg/activator/handler/reporting_handler_test.go new file mode 100644 index 000000000000..10a9b67f3192 --- /dev/null +++ b/pkg/activator/handler/reporting_handler_test.go @@ -0,0 +1,159 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/google/go-cmp/cmp/cmpopts" + + "github.com/google/go-cmp/cmp" + + "github.com/knative/serving/pkg/activator" + + "github.com/knative/serving/pkg/reconciler" +) + +func TestReporterHandlerResponseReceived(t *testing.T) { + baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add(activator.ResponseCountHTTPHeader, "1234") + w.WriteHeader(http.StatusTeapot) + }) + + reporter := &fakeReporter{} + handler := ReportingHTTPHandler{NextHandler: baseHandler, Reporter: reporter} + + resp := httptest.NewRecorder() + req := httptest.NewRequest("POST", "http://example.com", nil) + + req.Header.Add(reconciler.GetRevisionHeaderNamespace(), "revision-namespace") + req.Header.Add(reconciler.GetRevisionHeaderName(), "revision-name") + req.Header.Add(reconciler.GetConfigurationHeader(), "configuration-name") + + handler.ServeHTTP(resp, req) + + want := []call{ + { + Op: "ReportResponseCount", + Namespace: "revision-namespace", + Revision: "revision-name", + Config: "configuration-name", + StatusCode: http.StatusTeapot, + Attempts: 1234, + Version: 1.0, + }, + { + Op: "ReportResponseTime", + Namespace: "revision-namespace", + Revision: "revision-name", + Config: "configuration-name", + StatusCode: http.StatusTeapot, + }, + } + + got := reporter.calls + + if diff := cmp.Diff(want, got, ignoreDurationOption); diff != "" { + t.Errorf("Reporting calls are different (-want, +got) = %v", diff) + } + + if got[1].Duration == 0 { + t.Errorf("Expected a ReportResponseTime duration > 0") + } +} + +func TestReporterHandlerCountHeaderMissing(t *testing.T) { + baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusTeapot) + }) + + reporter := &fakeReporter{} + handler := ReportingHTTPHandler{NextHandler: baseHandler, Reporter: reporter} + + resp := httptest.NewRecorder() + req := httptest.NewRequest("POST", "http://example.com", nil) + + req.Header.Add(reconciler.GetRevisionHeaderNamespace(), "revision-namespace") + req.Header.Add(reconciler.GetRevisionHeaderName(), "revision-name") + req.Header.Add(reconciler.GetConfigurationHeader(), "configuration-name") + + handler.ServeHTTP(resp, req) + + want := []call{ + { + Op: "ReportResponseTime", + Namespace: "revision-namespace", + Revision: "revision-name", + Config: "configuration-name", + StatusCode: http.StatusTeapot, + }, + } + + got := reporter.calls + + if diff := cmp.Diff(want, got, ignoreDurationOption); diff != "" { + t.Errorf("Reporting calls are different (-want, +got) = %v", diff) + } +} + +var ignoreDurationOption = cmpopts.IgnoreFields(call{}, "Duration") + +type call struct { + Op string + Namespace string + Config string + Revision string + StatusCode int + Attempts int + Version float64 + Duration time.Duration +} + +type fakeReporter struct { + calls []call +} + +func (f *fakeReporter) ReportRequest(ns, config, rev, servingState string, v float64) error { + return nil +} + +func (f *fakeReporter) ReportResponseCount(ns, config, rev string, responseCode, numTries int, v float64) error { + f.calls = append(f.calls, call{ + Op: "ReportResponseCount", + Namespace: ns, + Config: config, + Revision: rev, + StatusCode: responseCode, + Attempts: numTries, + Version: v, + }) + + return nil +} + +func (f *fakeReporter) ReportResponseTime(ns, config, rev string, responseCode int, d time.Duration) error { + f.calls = append(f.calls, call{ + Op: "ReportResponseTime", + Namespace: ns, + Config: config, + Revision: rev, + StatusCode: responseCode, + Duration: d, + }) + + return nil +} diff --git a/pkg/activator/util/coverage.out b/pkg/activator/util/coverage.out new file mode 100644 index 000000000000..7e7719c19046 --- /dev/null +++ b/pkg/activator/util/coverage.out @@ -0,0 +1,34 @@ +mode: set +github.com/knative/serving/pkg/activator/util/retryer.go:25.47,27.2 1 1 +github.com/knative/serving/pkg/activator/util/retryer.go:31.71,32.59 1 1 +github.com/knative/serving/pkg/activator/util/retryer.go:32.59,33.65 1 1 +github.com/knative/serving/pkg/activator/util/retryer.go:36.3,36.9 1 1 +github.com/knative/serving/pkg/activator/util/retryer.go:33.65,35.4 1 1 +github.com/knative/serving/pkg/activator/util/rewinder.go:28.50,30.2 1 1 +github.com/knative/serving/pkg/activator/util/rewinder.go:32.48,35.17 1 1 +github.com/knative/serving/pkg/activator/util/rewinder.go:45.2,45.21 1 1 +github.com/knative/serving/pkg/activator/util/rewinder.go:35.17,37.17 2 1 +github.com/knative/serving/pkg/activator/util/rewinder.go:40.3,42.30 2 1 +github.com/knative/serving/pkg/activator/util/rewinder.go:37.17,39.4 1 0 +github.com/knative/serving/pkg/activator/util/rewinder.go:48.34,53.2 2 1 +github.com/knative/serving/pkg/activator/util/transports.go:28.79,30.2 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:33.85,34.72 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:34.72,36.24 2 1 +github.com/knative/serving/pkg/activator/util/transports.go:40.3,40.24 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:36.24,38.4 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:50.40,51.40 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:51.40,53.3 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:64.117,71.2 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:73.91,79.44 2 1 +github.com/knative/serving/pkg/activator/util/transports.go:97.2,97.16 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:109.2,109.8 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:79.44,82.17 2 1 +github.com/knative/serving/pkg/activator/util/transports.go:87.3,87.44 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:94.3,94.15 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:82.17,85.4 2 1 +github.com/knative/serving/pkg/activator/util/transports.go:87.44,88.23 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:88.23,91.5 2 1 +github.com/knative/serving/pkg/activator/util/transports.go:97.16,100.25 2 1 +github.com/knative/serving/pkg/activator/util/transports.go:104.3,104.77 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:100.25,102.4 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:105.8,107.3 1 1 diff --git a/pkg/activator/util/io_test.go b/pkg/activator/util/io_test.go new file mode 100644 index 000000000000..6c4ed61b1568 --- /dev/null +++ b/pkg/activator/util/io_test.go @@ -0,0 +1,35 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package util + +import "io" + +type spyReadCloser struct { + io.Reader + Closed bool + ReadAfterClose bool +} + +func (s *spyReadCloser) Read(b []byte) (n int, err error) { + if s.Closed { + s.ReadAfterClose = true + } + + return s.Reader.Read(b) +} + +func (s *spyReadCloser) Close() error { + s.Closed = true + + return nil +} diff --git a/pkg/activator/util/retryer.go b/pkg/activator/util/retryer.go new file mode 100644 index 000000000000..7357ed8ea6bd --- /dev/null +++ b/pkg/activator/util/retryer.go @@ -0,0 +1,38 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import "time" + +type Retryer interface { + Retry(func() bool) int +} + +type ActionFunc func() bool +type RetryerFunc func(ActionFunc) int + +func (r RetryerFunc) Retry(f func() bool) int { + return r(f) +} + +// NewLinearRetryer will return a retryer that retries `action` up to +// `maxRetries` times with `interval` delay between retries +func NewLinearRetryer(interval time.Duration, maxRetries int) Retryer { + return RetryerFunc(func(action ActionFunc) (retries int) { + for retries = 1; !action() && retries < maxRetries; retries++ { + time.Sleep(interval) + } + return + }) +} diff --git a/pkg/activator/util/retryer_test.go b/pkg/activator/util/retryer_test.go new file mode 100644 index 000000000000..5f669097c087 --- /dev/null +++ b/pkg/activator/util/retryer_test.go @@ -0,0 +1,95 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package util + +import ( + "testing" + "time" +) + +func TestLinearRetry(t *testing.T) { + checkInterval := func(last *time.Time, want time.Duration) { + now := time.Now() + got := now.Sub(*last) + *last = now + + if got < want { + t.Errorf("Unexpected retry interval. Want %v, got %v", want, got) + } + } + + examples := []struct { + label string + interval time.Duration + maxRetries int + responses []bool + wantRetries int + }{ + { + label: "atleast once", + interval: 5 * time.Millisecond, + maxRetries: 0, + responses: []bool{true}, + wantRetries: 1, + }, + { + label: "< maxRetries", + interval: 5 * time.Millisecond, + maxRetries: 3, + responses: []bool{false, true}, + wantRetries: 2, + }, + { + label: "= maxRetries", + interval: 10 * time.Millisecond, + maxRetries: 3, + responses: []bool{false, false, true}, + wantRetries: 3, + }, + { + label: "> maxRetries", + interval: 5 * time.Millisecond, + maxRetries: 3, + responses: []bool{false, false, false, true}, + wantRetries: 3, + }, + } + + for _, e := range examples { + t.Run(e.label, func(t *testing.T) { + var lastRetry time.Time + var got int + + a := func() bool { + checkInterval(&lastRetry, e.interval) + + ok := e.responses[got] + got++ + + return ok + } + + lr := NewLinearRetryer(e.interval, e.maxRetries) + + reported := lr.Retry(a) + + if got != e.wantRetries { + t.Errorf("Unexpected retries. Want %d, got %d", e.wantRetries, got) + } + + if reported != e.wantRetries { + t.Errorf("Unexpected retries reported. Want %d, got %d", e.wantRetries, reported) + } + }) + } +} diff --git a/cmd/util/io.go b/pkg/activator/util/rewinder.go similarity index 91% rename from cmd/util/io.go rename to pkg/activator/util/rewinder.go index cdfc6cddf78d..bc37daf278f9 100644 --- a/cmd/util/io.go +++ b/pkg/activator/util/rewinder.go @@ -19,17 +19,6 @@ import ( "io/ioutil" ) -type SpyCloser struct { - io.Reader - Closed bool -} - -func (sc *SpyCloser) Close() error { - sc.Closed = true - - return nil -} - type rewinder struct { rc io.ReadCloser rs io.ReadSeeker diff --git a/cmd/util/io_test.go b/pkg/activator/util/rewinder_test.go similarity index 95% rename from cmd/util/io_test.go rename to pkg/activator/util/rewinder_test.go index 313a9571668a..f55f6be82122 100644 --- a/cmd/util/io_test.go +++ b/pkg/activator/util/rewinder_test.go @@ -20,7 +20,7 @@ import ( func TestRewinder(t *testing.T) { str := "test string" - rc := &SpyCloser{Reader: bytes.NewBufferString(str)} + rc := &spyReadCloser{Reader: bytes.NewBufferString(str)} rewinder := NewRewinder(rc) b1, err := ioutil.ReadAll(rewinder) diff --git a/cmd/util/transports.go b/pkg/activator/util/transports.go similarity index 71% rename from cmd/util/transports.go rename to pkg/activator/util/transports.go index dea0bb006caa..24481fe36994 100644 --- a/cmd/util/transports.go +++ b/pkg/activator/util/transports.go @@ -15,7 +15,9 @@ package util import ( "net/http" - "time" + "strconv" + + "github.com/knative/serving/pkg/activator" h2cutil "github.com/knative/serving/pkg/h2c" "go.uber.org/zap" @@ -27,10 +29,10 @@ func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { return rt(r) } -// HttpTransport will use the appropriate transport for the request's http protocol version -func NewHttpTransport(v1 http.RoundTripper, v2 http.RoundTripper) http.RoundTripper { +// NewHttpTransport will use the appropriate transport for the request's HTTP protocol version +func NewHTTPTransport(v1 http.RoundTripper, v2 http.RoundTripper) http.RoundTripper { return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { - var t http.RoundTripper = v1 + t := v1 if r.ProtoMajor == 2 { t = v2 } @@ -39,28 +41,8 @@ func NewHttpTransport(v1 http.RoundTripper, v2 http.RoundTripper) http.RoundTrip }) } -// AutoTransport uses h2c for HTTPv2 requests and falls back to `http.DefaultTransport` for all others -var AutoTransport = NewHttpTransport(http.DefaultTransport, h2cutil.DefaultTransport) - -type Retryer interface { - Retry(func() bool) int -} - -type RetryerFunc func(func() bool) int - -func (r RetryerFunc) Retry(f func() bool) int { - return r(f) -} - -// LinearRetryer will retry `action` up to `maxRetries` times with `interval` delay between retries -func NewLinearRetryer(interval time.Duration, maxRetries int) Retryer { - return RetryerFunc(func(action func() bool) (retries int) { - for retries = 1; !action() && retries < maxRetries; retries++ { - time.Sleep(interval) - } - return - }) -} +// AutoTransport uses h2c for HTTP2 requests and falls back to `http.DefaultTransport` for all others +var AutoTransport = NewHTTPTransport(http.DefaultTransport, h2cutil.DefaultTransport) type RetryCond func(*http.Response) bool @@ -89,6 +71,11 @@ func NewRetryRoundTripper(rt http.RoundTripper, l *zap.SugaredLogger, r Retryer, } func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (resp *http.Response, err error) { + // The request body cannot be read multiple times for retries. + // The workaround is to clone the request body into a byte reader + // so the body can be read multiple times. + r.Body = NewRewinder(r.Body) + attempts := rrt.retryer.Retry(func() bool { resp, err = rrt.transport.RoundTrip(r) @@ -107,9 +94,14 @@ func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (resp *http.Response, e return false }) - // TODO: add metrics for number of tries and the response code. if err == nil { rrt.logger.Infof("Finished after %d attempt(s). Response code: %d", attempts, resp.StatusCode) + + if resp.Header == nil { + resp.Header = make(http.Header) + } + + resp.Header.Add(activator.ResponseCountHTTPHeader, strconv.Itoa(attempts)) } else { rrt.logger.Errorf("Failed after %d attempts. Last error: %v", attempts, err) } diff --git a/cmd/util/transports_test.go b/pkg/activator/util/transports_test.go similarity index 52% rename from cmd/util/transports_test.go rename to pkg/activator/util/transports_test.go index 27e503978f38..1f12c74ac5d1 100644 --- a/cmd/util/transports_test.go +++ b/pkg/activator/util/transports_test.go @@ -15,13 +15,14 @@ package util import ( "errors" "net/http" + "strings" "testing" - "time" - "go.uber.org/zap" + . "github.com/knative/pkg/logging/testing" + "github.com/knative/serving/pkg/activator" ) -func TestHttpRoundTripper(t *testing.T) { +func TestHTTPRoundTripper(t *testing.T) { wants := map[string]bool{} frt := func(key string) http.RoundTripper { return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { @@ -31,7 +32,7 @@ func TestHttpRoundTripper(t *testing.T) { }) } - rt := NewHttpTransport(frt("v1"), frt("v2")) + rt := NewHTTPTransport(frt("v1"), frt("v2")) examples := []struct { label string @@ -39,12 +40,12 @@ func TestHttpRoundTripper(t *testing.T) { want string }{ { - label: "use default transport for http1", + label: "use default transport for HTTP1", protoMajor: 1, want: "v1", }, { - label: "use h2c transport for http2", + label: "use h2c transport for HTTP2", protoMajor: 2, want: "v2", }, @@ -73,18 +74,15 @@ func TestHttpRoundTripper(t *testing.T) { func TestRetryRoundTripper(t *testing.T) { req := &http.Request{} - goodStatus := 200 - badStatus1 := 500 - badStatus2 := 400 - resp := func(status int) *http.Response { - return &http.Response{StatusCode: status, Body: &SpyCloser{}} + return &http.Response{StatusCode: status, Body: &spyReadCloser{}} } someErr := errors.New("some error") - - logger := zap.NewExample().Sugar() - conditions := []RetryCond{RetryStatus(badStatus1), RetryStatus(badStatus2)} + conditions := []RetryCond{ + RetryStatus(http.StatusInternalServerError), + RetryStatus(http.StatusBadRequest), + } examples := []struct { label string @@ -96,7 +94,7 @@ func TestRetryRoundTripper(t *testing.T) { }{ { label: "no retry", - resp: resp(goodStatus), + resp: resp(http.StatusOK), err: nil, cond: conditions, wantRetry: false, @@ -104,7 +102,7 @@ func TestRetryRoundTripper(t *testing.T) { }, { label: "no conditions", - resp: resp(badStatus1), + resp: resp(http.StatusInternalServerError), err: nil, cond: []RetryCond{}, wantRetry: false, @@ -120,7 +118,7 @@ func TestRetryRoundTripper(t *testing.T) { }, { label: "retry on condition 1", - resp: resp(badStatus1), + resp: resp(http.StatusInternalServerError), err: nil, cond: conditions, wantRetry: true, @@ -128,7 +126,7 @@ func TestRetryRoundTripper(t *testing.T) { }, { label: "retry on condition 2", - resp: resp(badStatus2), + resp: resp(http.StatusBadRequest), err: nil, cond: conditions, wantRetry: true, @@ -142,8 +140,8 @@ func TestRetryRoundTripper(t *testing.T) { return e.resp, e.err }) - retry := RetryerFunc(func(a func() bool) int { - if a() { + retry := RetryerFunc(func(action ActionFunc) int { + if action() { if !e.wantRetry { t.Errorf("Unexpected retry.") } @@ -151,98 +149,56 @@ func TestRetryRoundTripper(t *testing.T) { return 1 }) - rt := NewRetryRoundTripper(transport, logger, retry, e.cond...) + rt := NewRetryRoundTripper( + transport, + TestLogger(t), + retry, + e.cond..., + ) - gotResp, gotErr := rt.RoundTrip(req) + resp, err := rt.RoundTrip(req) - if gotResp != e.resp { - t.Errorf("Unexpected response. Want %v, got %v", e.resp, gotResp) + if resp != e.resp { + t.Errorf("Unexpected response. Want %v, got %v", e.resp, resp) } - if gotErr != e.err { - t.Errorf("Unexpected error. Want %v, got %v", e.err, gotErr) + if err != e.err { + t.Errorf("Unexpected error. Want %v, got %v", e.err, err) } - if e.wantBodyClosed && !e.resp.Body.(*SpyCloser).Closed { + if e.wantBodyClosed && !e.resp.Body.(*spyReadCloser).Closed { t.Errorf("Expected response body to be closed.") } + + if resp != nil { + if got, want := resp.Header.Get(activator.ResponseCountHTTPHeader), "1"; got != want { + t.Errorf("Expected retry header not the same got: %q want: %q", got, want) + } + } }) } } -func TestLinearRetry(t *testing.T) { - checkInterval := func(last *time.Time, want time.Duration) { - now := time.Now() - got := now.Sub(*last) - *last = now - - if got < want { - t.Errorf("Unexpected retry interval. Want %v, got %v", want, got) - } - } - - examples := []struct { - label string - interval time.Duration - maxRetries int - responses []bool - wantRetries int - }{ - { - label: "atleast once", - interval: 5 * time.Millisecond, - maxRetries: 0, - responses: []bool{true}, - wantRetries: 1, - }, - { - label: "< maxRetries", - interval: 5 * time.Millisecond, - maxRetries: 3, - responses: []bool{false, true}, - wantRetries: 2, - }, - { - label: "= maxRetries", - interval: 10 * time.Millisecond, - maxRetries: 3, - responses: []bool{false, false, true}, - wantRetries: 3, - }, - { - label: "> maxRetries", - interval: 5 * time.Millisecond, - maxRetries: 3, - responses: []bool{false, false, false, true}, - wantRetries: 3, - }, - } - - for _, e := range examples { - t.Run(e.label, func(t *testing.T) { - var lastRetry time.Time - var got int +func TestRetryRoundTripperRewind(t *testing.T) { + retry := RetryerFunc(func(action ActionFunc) int { + action() + action() + return 2 + }) - a := func() bool { - checkInterval(&lastRetry, e.interval) + rt := NewRetryRoundTripper( + http.DefaultTransport, + TestLogger(t), + retry, + RetryStatus(http.StatusInternalServerError), + ) - ok := e.responses[got] - got++ + spy := &spyReadCloser{Reader: strings.NewReader("request body")} + req, _ := http.NewRequest("POST", "http://knative.dev/test/", spy) - return ok - } - - lr := NewLinearRetryer(e.interval, e.maxRetries) + rt.RoundTrip(req) - reported := lr.Retry(a) - - if got != e.wantRetries { - t.Errorf("Unexpected retries. Want %d, got %d", e.wantRetries, got) - } - - if reported != e.wantRetries { - t.Errorf("Unexpected retries reported. Want %d, got %d", e.wantRetries, reported) - } - }) + if spy.ReadAfterClose { + t.Fatal("The retry round tripper read the request body more than once") } }