diff --git a/cmd/activator/main.go b/cmd/activator/main.go index b8b07187a1e1..be4a39dcfcdc 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -17,15 +17,9 @@ limitations under the License. package main import ( - "bytes" "flag" - "fmt" - "io" - "io/ioutil" "log" "net/http" - "net/http/httputil" - "net/url" "time" "github.com/knative/pkg/logging/logkey" @@ -33,10 +27,11 @@ import ( "github.com/knative/pkg/configmap" "github.com/knative/pkg/signals" "github.com/knative/serving/pkg/activator" + activatorhandler "github.com/knative/serving/pkg/activator/handler" + activatorutil "github.com/knative/serving/pkg/activator/util" clientset "github.com/knative/serving/pkg/client/clientset/versioned" - h2cutil "github.com/knative/serving/pkg/h2c" + "github.com/knative/serving/pkg/logging" - "github.com/knative/serving/pkg/reconciler" "github.com/knative/serving/pkg/system" "github.com/knative/serving/third_party/h2c" "go.opencensus.io/exporter/prometheus" @@ -48,123 +43,11 @@ import ( const ( maxUploadBytes = 32e6 // 32MB - same as app engine - maxRetry = 60 + maxRetries = 60 retryInterval = 1 * time.Second logLevelKey = "activator" ) -type activationHandler struct { - act activator.Activator - logger *zap.SugaredLogger - reporter activator.StatsReporter -} - -// retryRoundTripper retries on 503's for up to 60 seconds. The reason is there is -// a small delay for k8s to include the ready IP in service. -// https://github.com/knative/serving/issues/660#issuecomment-384062553 -type retryRoundTripper struct { - logger *zap.SugaredLogger - reporter activator.StatsReporter - start time.Time -} - -func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { - var err error - var reqBody *bytes.Reader - - transport := http.DefaultTransport - - if r.ProtoMajor == 2 { - transport = h2cutil.NewTransport() - } - - if r.Body != nil { - reqBytes, err := ioutil.ReadAll(r.Body) - - if err != nil { - rrt.logger.Errorf("Error reading request body: %s", err) - return nil, err - } - - reqBody = bytes.NewReader(reqBytes) - r.Body = ioutil.NopCloser(reqBody) - } - - resp, err := transport.RoundTrip(r) - // TODO: Activator should retry with backoff. - // https://github.com/knative/serving/issues/1229 - i := 1 - for ; i < maxRetry; i++ { - if err == nil && resp != nil && resp.StatusCode != 503 { - break - } - - if err != nil { - rrt.logger.Errorf("Error making a request: %s", err) - } - - if resp != nil { - resp.Body.Close() - } - - time.Sleep(retryInterval) - - // The request body cannot be read multiple times for retries. - // The workaround is to clone the request body into a byte reader - // so the body can be read multiple times. - if r.Body != nil { - reqBody.Seek(0, io.SeekStart) - } - - resp, err = transport.RoundTrip(r) - } - - if resp != nil { - rrt.logger.Infof("It took %d tries to get response code %d", i, resp.StatusCode) - namespace := r.Header.Get(reconciler.GetRevisionHeaderNamespace()) - name := r.Header.Get(reconciler.GetRevisionHeaderName()) - config := r.Header.Get(reconciler.GetConfigurationHeader()) - rrt.reporter.ReportResponseCount(namespace, config, name, resp.StatusCode, i, 1.0) - rrt.reporter.ReportResponseTime(namespace, config, name, resp.StatusCode, time.Now().Sub(rrt.start)) - } - return resp, err -} - -func (a *activationHandler) handler(w http.ResponseWriter, r *http.Request) { - namespace := r.Header.Get(reconciler.GetRevisionHeaderNamespace()) - name := r.Header.Get(reconciler.GetRevisionHeaderName()) - config := r.Header.Get(reconciler.GetConfigurationHeader()) - start := time.Now() - - if r.ContentLength > maxUploadBytes { - w.WriteHeader(http.StatusRequestEntityTooLarge) - a.reporter.ReportResponseCount(namespace, config, name, http.StatusRequestEntityTooLarge, 1, 1.0) - a.reporter.ReportResponseTime(namespace, config, name, http.StatusRequestEntityTooLarge, time.Now().Sub(start)) - return - } - - endpoint, status, err := a.act.ActiveEndpoint(namespace, config, name) - if err != nil { - msg := fmt.Sprintf("Error getting active endpoint: %v", err) - http.Error(w, msg, int(status)) - a.logger.Errorf(msg) - a.reporter.ReportResponseCount(namespace, config, name, int(status), 1, 1.0) - a.reporter.ReportResponseTime(namespace, config, name, int(status), time.Now().Sub(start)) - return - } - target := &url.URL{ - Scheme: "http", - Host: fmt.Sprintf("%s:%d", endpoint.FQDN, endpoint.Port), - } - proxy := httputil.NewSingleHostReverseProxy(target) - proxy.Transport = &retryRoundTripper{ - logger: a.logger, - reporter: a.reporter, - start: start, - } - proxy.ServeHTTP(w, r) -} - func main() { flag.Parse() cm, err := configmap.Load("/etc/config-logging") @@ -208,7 +91,26 @@ func main() { a := activator.NewRevisionActivator(kubeClient, servingClient, logger, reporter) a = activator.NewDedupingActivator(a) - ah := &activationHandler{a, logger, reporter} + + // Retry on 503's for up to 60 seconds. The reason is there is + // a small delay for k8s to include the ready IP in service. + // https://github.com/knative/serving/issues/660#issuecomment-384062553 + shouldRetry := activatorutil.RetryStatus(http.StatusServiceUnavailable) + retryer := activatorutil.NewLinearRetryer(retryInterval, maxRetries) + + rt := activatorutil.NewRetryRoundTripper(activatorutil.AutoTransport, logger, retryer, shouldRetry) + + ah := &activatorhandler.ReportingHTTPHandler{ + Reporter: reporter, + NextHandler: &activatorhandler.EnforceMaxContentLengthHandler{ + MaxContentLengthBytes: maxUploadBytes, + NextHandler: &activatorhandler.ActivationHandler{ + Activator: a, + Transport: rt, + Logger: logger, + }, + }, + } // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() @@ -226,7 +128,7 @@ func main() { // Start the endpoint for Prometheus scraping mux := http.NewServeMux() - mux.HandleFunc("/", ah.handler) + mux.HandleFunc("/", ah.ServeHTTP) mux.Handle("/metrics", promExporter) h2c.ListenAndServe(":8080", mux) } diff --git a/cmd/queue/main.go b/cmd/queue/main.go index b9facf56b216..e5bf49b7f6cb 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -284,7 +284,7 @@ func main() { httpProxy = httputil.NewSingleHostReverseProxy(target) h2cProxy = httputil.NewSingleHostReverseProxy(target) - h2cProxy.Transport = h2cutil.NewTransport() + h2cProxy.Transport = h2cutil.DefaultTransport logger.Infof("Queue container is starting, concurrencyModel: %s", *concurrencyModel) config, err := rest.InClusterConfig() diff --git a/pkg/activator/activator.go b/pkg/activator/activator.go index 2ff3d28e23de..469131b034a4 100644 --- a/pkg/activator/activator.go +++ b/pkg/activator/activator.go @@ -17,7 +17,8 @@ package activator const ( // The name of the activator service. - K8sServiceName = "activator-service" + K8sServiceName = "activator-service" + ResponseCountHTTPHeader = "X-Activator-Num-Retries" ) // Status is an HTTP status code. diff --git a/pkg/activator/handler/coverage.out b/pkg/activator/handler/coverage.out new file mode 100644 index 000000000000..18bcbc777ce2 --- /dev/null +++ b/pkg/activator/handler/coverage.out @@ -0,0 +1,12 @@ +mode: set +github.com/knative/serving/pkg/activator/handler/enforce_length_handler.go:26.92,27.47 1 1 +github.com/knative/serving/pkg/activator/handler/enforce_length_handler.go:32.2,32.31 1 1 +github.com/knative/serving/pkg/activator/handler/enforce_length_handler.go:27.47,30.3 2 1 +github.com/knative/serving/pkg/activator/handler/handler.go:35.90,37.2 1 1 +github.com/knative/serving/pkg/activator/handler/handler.go:39.79,46.16 5 1 +github.com/knative/serving/pkg/activator/handler/handler.go:54.2,66.23 5 1 +github.com/knative/serving/pkg/activator/handler/handler.go:46.16,52.3 4 1 +github.com/knative/serving/pkg/activator/handler/reporting_handler.go:30.82,46.87 9 1 +github.com/knative/serving/pkg/activator/handler/reporting_handler.go:51.2,51.79 1 1 +github.com/knative/serving/pkg/activator/handler/reporting_handler.go:46.87,49.3 2 1 +github.com/knative/serving/pkg/activator/handler/reporting_handler.go:59.53,63.2 2 1 diff --git a/pkg/activator/handler/enforce_length_handler.go b/pkg/activator/handler/enforce_length_handler.go new file mode 100644 index 000000000000..b4b2da84bae3 --- /dev/null +++ b/pkg/activator/handler/enforce_length_handler.go @@ -0,0 +1,33 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "net/http" +) + +// EnforceMaxContentLengthHandler prevents uploads larger than `MaxContentLengthBytes` +type EnforceMaxContentLengthHandler struct { + NextHandler http.Handler + MaxContentLengthBytes int64 +} + +func (h *EnforceMaxContentLengthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.ContentLength > h.MaxContentLengthBytes { + w.WriteHeader(http.StatusRequestEntityTooLarge) + return + } + + h.NextHandler.ServeHTTP(w, r) +} diff --git a/pkg/activator/handler/enforce_length_handler_test.go b/pkg/activator/handler/enforce_length_handler_test.go new file mode 100644 index 000000000000..b579bc3a54b9 --- /dev/null +++ b/pkg/activator/handler/enforce_length_handler_test.go @@ -0,0 +1,64 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package handler + +import ( + "bytes" + "net/http" + "net/http/httptest" + "testing" +) + +func TestEnforceMaxContentLengthHandler(t *testing.T) { + payload := "SAMPLE PAYLOAD" + + examples := []struct { + label string + maxUpload int + status int + }{ + { + label: "under", + maxUpload: len(payload) + 1, + status: http.StatusOK, + }, + { + label: "equal", + maxUpload: len(payload), + status: http.StatusOK, + }, + { + label: "over", + maxUpload: len(payload) - 1, + status: http.StatusRequestEntityTooLarge, + }, + } + + for _, e := range examples { + t.Run(e.label, func(t *testing.T) { + baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + handler := EnforceMaxContentLengthHandler{NextHandler: baseHandler, MaxContentLengthBytes: int64(e.maxUpload)} + + resp := httptest.NewRecorder() + req := httptest.NewRequest("POST", "http://example.com", bytes.NewBufferString(payload)) + + handler.ServeHTTP(resp, req) + + if resp.Code != e.status { + t.Errorf("Unexpected response status for payload %q. Want %d, got %d", payload, e.status, resp.Code) + } + }) + } +} diff --git a/pkg/activator/handler/handler.go b/pkg/activator/handler/handler.go new file mode 100644 index 000000000000..d79464f0976b --- /dev/null +++ b/pkg/activator/handler/handler.go @@ -0,0 +1,63 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "fmt" + "net/http" + "net/http/httputil" + "net/url" + + "github.com/knative/serving/pkg/activator" + "github.com/knative/serving/pkg/reconciler" + "go.uber.org/zap" +) + +// ActivationHandler will wait for an active endpoint for a revision +// to be available before proxing the request +type ActivationHandler struct { + Activator activator.Activator + Logger *zap.SugaredLogger + Transport http.RoundTripper +} + +func (a *ActivationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + namespace := r.Header.Get(reconciler.GetRevisionHeaderNamespace()) + name := r.Header.Get(reconciler.GetRevisionHeaderName()) + config := r.Header.Get(reconciler.GetConfigurationHeader()) + + endpoint, status, err := a.Activator.ActiveEndpoint(namespace, config, name) + + if err != nil { + msg := fmt.Sprintf("Error getting active endpoint: %v", err) + + a.Logger.Errorf(msg) + http.Error(w, msg, int(status)) + return + } + + target := &url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%d", endpoint.FQDN, endpoint.Port), + } + + proxy := httputil.NewSingleHostReverseProxy(target) + proxy.Transport = a.Transport + + // TODO: Clear the host to avoid 404's. + // https://github.com/knative/serving/issues/964 + r.Host = "" + + proxy.ServeHTTP(w, r) +} diff --git a/pkg/activator/handler/handler_test.go b/pkg/activator/handler/handler_test.go new file mode 100644 index 000000000000..d95ce60aeadd --- /dev/null +++ b/pkg/activator/handler/handler_test.go @@ -0,0 +1,144 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package handler + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + + . "github.com/knative/pkg/logging/testing" + "github.com/knative/serving/pkg/activator" + "github.com/knative/serving/pkg/activator/util" + "github.com/knative/serving/pkg/reconciler" +) + +type stubActivator struct { + endpoint activator.Endpoint + namespace string + name string +} + +func newStubActivator(namespace string, name string, server *httptest.Server) activator.Activator { + url, _ := url.Parse(server.URL) + host := url.Hostname() + port, _ := strconv.Atoi(url.Port()) + + return &stubActivator{ + endpoint: activator.Endpoint{FQDN: host, Port: int32(port)}, + namespace: namespace, + name: name, + } +} + +func (fa *stubActivator) ActiveEndpoint(namespace, configuration, name string) (activator.Endpoint, activator.Status, error) { + if namespace == fa.namespace && name == fa.name { + return fa.endpoint, http.StatusOK, nil + } + + return activator.Endpoint{}, http.StatusNotFound, errors.New("not found!") +} + +func (fa *stubActivator) Shutdown() { +} + +func TestActivationHandler(t *testing.T) { + errMsg := func(msg string) string { + return fmt.Sprintf("Error getting active endpoint: %v\n", msg) + } + + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.WriteString(w, "everything good!") + }), + ) + defer server.Close() + + act := newStubActivator("real-namespace", "real-name", server) + + examples := []struct { + label string + namespace string + name string + wantBody string + wantCode int + wantErr error + }{ + { + label: "active endpoint", + namespace: "real-namespace", + name: "real-name", + wantBody: "everything good!", + wantCode: http.StatusOK, + wantErr: nil, + }, + { + label: "no active endpoint", + namespace: "fake-namespace", + name: "fake-name", + wantBody: errMsg("not found!"), + wantCode: http.StatusNotFound, + wantErr: nil, + }, + { + label: "request error", + namespace: "real-namespace", + name: "real-name", + wantBody: "", + wantCode: http.StatusBadGateway, + wantErr: errors.New("request error!"), + }, + } + + for _, e := range examples { + t.Run(e.label, func(t *testing.T) { + rt := util.RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + if e.wantErr != nil { + return nil, e.wantErr + } + + return http.DefaultTransport.RoundTrip(r) + }) + + handler := ActivationHandler{ + Activator: act, + Transport: rt, + Logger: TestLogger(t), + } + + resp := httptest.NewRecorder() + + req := httptest.NewRequest("POST", "http://example.com", nil) + req.Header.Set(reconciler.GetRevisionHeaderNamespace(), e.namespace) + req.Header.Set(reconciler.GetRevisionHeaderName(), e.name) + + handler.ServeHTTP(resp, req) + + if resp.Code != e.wantCode { + t.Errorf("Unexpected response status. Want %d, got %d", e.wantCode, resp.Code) + } + + gotBody, _ := ioutil.ReadAll(resp.Body) + if string(gotBody) != e.wantBody { + t.Errorf("Unexpected response body. Want %q, got %q", e.wantBody, gotBody) + } + }) + } + +} diff --git a/pkg/activator/handler/reporting_handler.go b/pkg/activator/handler/reporting_handler.go new file mode 100644 index 000000000000..727301f580c7 --- /dev/null +++ b/pkg/activator/handler/reporting_handler.go @@ -0,0 +1,65 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "net/http" + "strconv" + "time" + + "github.com/knative/serving/pkg/activator" + "github.com/knative/serving/pkg/reconciler" +) + +// ReportingHTTPHandler will forward request & response metrics +// to the `activator.StatsReporter`` +type ReportingHTTPHandler struct { + NextHandler http.Handler + Reporter activator.StatsReporter +} + +func (h *ReportingHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + namespace := r.Header.Get(reconciler.GetRevisionHeaderNamespace()) + name := r.Header.Get(reconciler.GetRevisionHeaderName()) + config := r.Header.Get(reconciler.GetConfigurationHeader()) + start := time.Now() + + capture := &statusCapture{ + ResponseWriter: w, + statusCode: http.StatusOK, + } + + h.NextHandler.ServeHTTP(capture, r) + + status := capture.statusCode + duration := time.Now().Sub(start) + + if numRetries := w.Header().Get(activator.ResponseCountHTTPHeader); numRetries != "" { + count, _ := strconv.Atoi(numRetries) + h.Reporter.ReportResponseCount(namespace, config, name, int(status), count, 1.0) + } + + h.Reporter.ReportResponseTime(namespace, config, name, int(status), duration) +} + +type statusCapture struct { + http.ResponseWriter + statusCode int +} + +func (s *statusCapture) WriteHeader(statusCode int) { + s.statusCode = statusCode + s.ResponseWriter.WriteHeader(statusCode) + +} diff --git a/pkg/activator/handler/reporting_handler_test.go b/pkg/activator/handler/reporting_handler_test.go new file mode 100644 index 000000000000..10a9b67f3192 --- /dev/null +++ b/pkg/activator/handler/reporting_handler_test.go @@ -0,0 +1,159 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/google/go-cmp/cmp/cmpopts" + + "github.com/google/go-cmp/cmp" + + "github.com/knative/serving/pkg/activator" + + "github.com/knative/serving/pkg/reconciler" +) + +func TestReporterHandlerResponseReceived(t *testing.T) { + baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add(activator.ResponseCountHTTPHeader, "1234") + w.WriteHeader(http.StatusTeapot) + }) + + reporter := &fakeReporter{} + handler := ReportingHTTPHandler{NextHandler: baseHandler, Reporter: reporter} + + resp := httptest.NewRecorder() + req := httptest.NewRequest("POST", "http://example.com", nil) + + req.Header.Add(reconciler.GetRevisionHeaderNamespace(), "revision-namespace") + req.Header.Add(reconciler.GetRevisionHeaderName(), "revision-name") + req.Header.Add(reconciler.GetConfigurationHeader(), "configuration-name") + + handler.ServeHTTP(resp, req) + + want := []call{ + { + Op: "ReportResponseCount", + Namespace: "revision-namespace", + Revision: "revision-name", + Config: "configuration-name", + StatusCode: http.StatusTeapot, + Attempts: 1234, + Version: 1.0, + }, + { + Op: "ReportResponseTime", + Namespace: "revision-namespace", + Revision: "revision-name", + Config: "configuration-name", + StatusCode: http.StatusTeapot, + }, + } + + got := reporter.calls + + if diff := cmp.Diff(want, got, ignoreDurationOption); diff != "" { + t.Errorf("Reporting calls are different (-want, +got) = %v", diff) + } + + if got[1].Duration == 0 { + t.Errorf("Expected a ReportResponseTime duration > 0") + } +} + +func TestReporterHandlerCountHeaderMissing(t *testing.T) { + baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusTeapot) + }) + + reporter := &fakeReporter{} + handler := ReportingHTTPHandler{NextHandler: baseHandler, Reporter: reporter} + + resp := httptest.NewRecorder() + req := httptest.NewRequest("POST", "http://example.com", nil) + + req.Header.Add(reconciler.GetRevisionHeaderNamespace(), "revision-namespace") + req.Header.Add(reconciler.GetRevisionHeaderName(), "revision-name") + req.Header.Add(reconciler.GetConfigurationHeader(), "configuration-name") + + handler.ServeHTTP(resp, req) + + want := []call{ + { + Op: "ReportResponseTime", + Namespace: "revision-namespace", + Revision: "revision-name", + Config: "configuration-name", + StatusCode: http.StatusTeapot, + }, + } + + got := reporter.calls + + if diff := cmp.Diff(want, got, ignoreDurationOption); diff != "" { + t.Errorf("Reporting calls are different (-want, +got) = %v", diff) + } +} + +var ignoreDurationOption = cmpopts.IgnoreFields(call{}, "Duration") + +type call struct { + Op string + Namespace string + Config string + Revision string + StatusCode int + Attempts int + Version float64 + Duration time.Duration +} + +type fakeReporter struct { + calls []call +} + +func (f *fakeReporter) ReportRequest(ns, config, rev, servingState string, v float64) error { + return nil +} + +func (f *fakeReporter) ReportResponseCount(ns, config, rev string, responseCode, numTries int, v float64) error { + f.calls = append(f.calls, call{ + Op: "ReportResponseCount", + Namespace: ns, + Config: config, + Revision: rev, + StatusCode: responseCode, + Attempts: numTries, + Version: v, + }) + + return nil +} + +func (f *fakeReporter) ReportResponseTime(ns, config, rev string, responseCode int, d time.Duration) error { + f.calls = append(f.calls, call{ + Op: "ReportResponseTime", + Namespace: ns, + Config: config, + Revision: rev, + StatusCode: responseCode, + Duration: d, + }) + + return nil +} diff --git a/pkg/activator/util/coverage.out b/pkg/activator/util/coverage.out new file mode 100644 index 000000000000..7e7719c19046 --- /dev/null +++ b/pkg/activator/util/coverage.out @@ -0,0 +1,34 @@ +mode: set +github.com/knative/serving/pkg/activator/util/retryer.go:25.47,27.2 1 1 +github.com/knative/serving/pkg/activator/util/retryer.go:31.71,32.59 1 1 +github.com/knative/serving/pkg/activator/util/retryer.go:32.59,33.65 1 1 +github.com/knative/serving/pkg/activator/util/retryer.go:36.3,36.9 1 1 +github.com/knative/serving/pkg/activator/util/retryer.go:33.65,35.4 1 1 +github.com/knative/serving/pkg/activator/util/rewinder.go:28.50,30.2 1 1 +github.com/knative/serving/pkg/activator/util/rewinder.go:32.48,35.17 1 1 +github.com/knative/serving/pkg/activator/util/rewinder.go:45.2,45.21 1 1 +github.com/knative/serving/pkg/activator/util/rewinder.go:35.17,37.17 2 1 +github.com/knative/serving/pkg/activator/util/rewinder.go:40.3,42.30 2 1 +github.com/knative/serving/pkg/activator/util/rewinder.go:37.17,39.4 1 0 +github.com/knative/serving/pkg/activator/util/rewinder.go:48.34,53.2 2 1 +github.com/knative/serving/pkg/activator/util/transports.go:28.79,30.2 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:33.85,34.72 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:34.72,36.24 2 1 +github.com/knative/serving/pkg/activator/util/transports.go:40.3,40.24 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:36.24,38.4 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:50.40,51.40 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:51.40,53.3 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:64.117,71.2 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:73.91,79.44 2 1 +github.com/knative/serving/pkg/activator/util/transports.go:97.2,97.16 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:109.2,109.8 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:79.44,82.17 2 1 +github.com/knative/serving/pkg/activator/util/transports.go:87.3,87.44 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:94.3,94.15 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:82.17,85.4 2 1 +github.com/knative/serving/pkg/activator/util/transports.go:87.44,88.23 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:88.23,91.5 2 1 +github.com/knative/serving/pkg/activator/util/transports.go:97.16,100.25 2 1 +github.com/knative/serving/pkg/activator/util/transports.go:104.3,104.77 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:100.25,102.4 1 1 +github.com/knative/serving/pkg/activator/util/transports.go:105.8,107.3 1 1 diff --git a/pkg/activator/util/io_test.go b/pkg/activator/util/io_test.go new file mode 100644 index 000000000000..6c4ed61b1568 --- /dev/null +++ b/pkg/activator/util/io_test.go @@ -0,0 +1,35 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package util + +import "io" + +type spyReadCloser struct { + io.Reader + Closed bool + ReadAfterClose bool +} + +func (s *spyReadCloser) Read(b []byte) (n int, err error) { + if s.Closed { + s.ReadAfterClose = true + } + + return s.Reader.Read(b) +} + +func (s *spyReadCloser) Close() error { + s.Closed = true + + return nil +} diff --git a/pkg/activator/util/retryer.go b/pkg/activator/util/retryer.go new file mode 100644 index 000000000000..7357ed8ea6bd --- /dev/null +++ b/pkg/activator/util/retryer.go @@ -0,0 +1,38 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import "time" + +type Retryer interface { + Retry(func() bool) int +} + +type ActionFunc func() bool +type RetryerFunc func(ActionFunc) int + +func (r RetryerFunc) Retry(f func() bool) int { + return r(f) +} + +// NewLinearRetryer will return a retryer that retries `action` up to +// `maxRetries` times with `interval` delay between retries +func NewLinearRetryer(interval time.Duration, maxRetries int) Retryer { + return RetryerFunc(func(action ActionFunc) (retries int) { + for retries = 1; !action() && retries < maxRetries; retries++ { + time.Sleep(interval) + } + return + }) +} diff --git a/pkg/activator/util/retryer_test.go b/pkg/activator/util/retryer_test.go new file mode 100644 index 000000000000..5f669097c087 --- /dev/null +++ b/pkg/activator/util/retryer_test.go @@ -0,0 +1,95 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package util + +import ( + "testing" + "time" +) + +func TestLinearRetry(t *testing.T) { + checkInterval := func(last *time.Time, want time.Duration) { + now := time.Now() + got := now.Sub(*last) + *last = now + + if got < want { + t.Errorf("Unexpected retry interval. Want %v, got %v", want, got) + } + } + + examples := []struct { + label string + interval time.Duration + maxRetries int + responses []bool + wantRetries int + }{ + { + label: "atleast once", + interval: 5 * time.Millisecond, + maxRetries: 0, + responses: []bool{true}, + wantRetries: 1, + }, + { + label: "< maxRetries", + interval: 5 * time.Millisecond, + maxRetries: 3, + responses: []bool{false, true}, + wantRetries: 2, + }, + { + label: "= maxRetries", + interval: 10 * time.Millisecond, + maxRetries: 3, + responses: []bool{false, false, true}, + wantRetries: 3, + }, + { + label: "> maxRetries", + interval: 5 * time.Millisecond, + maxRetries: 3, + responses: []bool{false, false, false, true}, + wantRetries: 3, + }, + } + + for _, e := range examples { + t.Run(e.label, func(t *testing.T) { + var lastRetry time.Time + var got int + + a := func() bool { + checkInterval(&lastRetry, e.interval) + + ok := e.responses[got] + got++ + + return ok + } + + lr := NewLinearRetryer(e.interval, e.maxRetries) + + reported := lr.Retry(a) + + if got != e.wantRetries { + t.Errorf("Unexpected retries. Want %d, got %d", e.wantRetries, got) + } + + if reported != e.wantRetries { + t.Errorf("Unexpected retries reported. Want %d, got %d", e.wantRetries, reported) + } + }) + } +} diff --git a/pkg/activator/util/rewinder.go b/pkg/activator/util/rewinder.go new file mode 100644 index 000000000000..bc37daf278f9 --- /dev/null +++ b/pkg/activator/util/rewinder.go @@ -0,0 +1,53 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "bytes" + "io" + "io/ioutil" +) + +type rewinder struct { + rc io.ReadCloser + rs io.ReadSeeker +} + +// rewinder wraps a single-use `ReadCloser` into a `ReadCloser` that can be read multiple times +func NewRewinder(rc io.ReadCloser) io.ReadCloser { + return &rewinder{rc: rc} +} + +func (r *rewinder) Read(b []byte) (int, error) { + // On the first `Read()`, the contents of `rc` is read into a buffer `rs`. + // This buffer is used for all subsequent reads + if r.rs == nil { + buf, err := ioutil.ReadAll(r.rc) + if err != nil { + return 0, err + } + r.rc.Close() + + r.rs = bytes.NewReader(buf) + } + + return r.rs.Read(b) +} + +func (r *rewinder) Close() error { + // Rewind the buffer on `Close()` for the next call to `Read` + r.rs.Seek(0, io.SeekStart) + + return nil +} diff --git a/pkg/activator/util/rewinder_test.go b/pkg/activator/util/rewinder_test.go new file mode 100644 index 000000000000..f55f6be82122 --- /dev/null +++ b/pkg/activator/util/rewinder_test.go @@ -0,0 +1,49 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package util + +import ( + "bytes" + "io/ioutil" + "testing" +) + +func TestRewinder(t *testing.T) { + str := "test string" + rc := &spyReadCloser{Reader: bytes.NewBufferString(str)} + rewinder := NewRewinder(rc) + + b1, err := ioutil.ReadAll(rewinder) + if err != nil { + t.Errorf("Unexpected error reading b1: %v", err) + } + rewinder.Close() + + b2, err := ioutil.ReadAll(rewinder) + if err != nil { + t.Errorf("Unexpected error reading b2: %v", err) + } + rewinder.Close() + + if string(b1) != str { + t.Errorf("Unexpected str b1. Want %q, got %q", str, b1) + } + + if string(b2) != str { + t.Errorf("Unexpected str b2. Want %q, got %q", str, b2) + } + + if !rc.Closed { + t.Errorf("Expected ReadCloser to be closed") + } +} diff --git a/pkg/activator/util/transports.go b/pkg/activator/util/transports.go new file mode 100644 index 000000000000..24481fe36994 --- /dev/null +++ b/pkg/activator/util/transports.go @@ -0,0 +1,110 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "net/http" + "strconv" + + "github.com/knative/serving/pkg/activator" + + h2cutil "github.com/knative/serving/pkg/h2c" + "go.uber.org/zap" +) + +type RoundTripperFunc func(*http.Request) (*http.Response, error) + +func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return rt(r) +} + +// NewHttpTransport will use the appropriate transport for the request's HTTP protocol version +func NewHTTPTransport(v1 http.RoundTripper, v2 http.RoundTripper) http.RoundTripper { + return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + t := v1 + if r.ProtoMajor == 2 { + t = v2 + } + + return t.RoundTrip(r) + }) +} + +// AutoTransport uses h2c for HTTP2 requests and falls back to `http.DefaultTransport` for all others +var AutoTransport = NewHTTPTransport(http.DefaultTransport, h2cutil.DefaultTransport) + +type RetryCond func(*http.Response) bool + +// RetryStatus will filter responses matching `status` +func RetryStatus(status int) RetryCond { + return func(resp *http.Response) bool { + return resp.StatusCode == status + } +} + +type retryRoundTripper struct { + logger *zap.SugaredLogger + transport http.RoundTripper + retryer Retryer + retryConds []RetryCond +} + +// RetryRoundTripper retries a request on error or retry condition, using the given `retry` strategy +func NewRetryRoundTripper(rt http.RoundTripper, l *zap.SugaredLogger, r Retryer, sr ...RetryCond) http.RoundTripper { + return &retryRoundTripper{ + logger: l, + transport: rt, + retryer: r, + retryConds: sr, + } +} + +func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (resp *http.Response, err error) { + // The request body cannot be read multiple times for retries. + // The workaround is to clone the request body into a byte reader + // so the body can be read multiple times. + r.Body = NewRewinder(r.Body) + + attempts := rrt.retryer.Retry(func() bool { + resp, err = rrt.transport.RoundTrip(r) + + if err != nil { + rrt.logger.Errorf("Error making a request: %s", err) + return true + } + + for _, retryCond := range rrt.retryConds { + if retryCond(resp) { + resp.Body.Close() + return true + } + } + + return false + }) + + if err == nil { + rrt.logger.Infof("Finished after %d attempt(s). Response code: %d", attempts, resp.StatusCode) + + if resp.Header == nil { + resp.Header = make(http.Header) + } + + resp.Header.Add(activator.ResponseCountHTTPHeader, strconv.Itoa(attempts)) + } else { + rrt.logger.Errorf("Failed after %d attempts. Last error: %v", attempts, err) + } + + return +} diff --git a/pkg/activator/util/transports_test.go b/pkg/activator/util/transports_test.go new file mode 100644 index 000000000000..1f12c74ac5d1 --- /dev/null +++ b/pkg/activator/util/transports_test.go @@ -0,0 +1,204 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package util + +import ( + "errors" + "net/http" + "strings" + "testing" + + . "github.com/knative/pkg/logging/testing" + "github.com/knative/serving/pkg/activator" +) + +func TestHTTPRoundTripper(t *testing.T) { + wants := map[string]bool{} + frt := func(key string) http.RoundTripper { + return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + wants[key] = true + + return nil, nil + }) + } + + rt := NewHTTPTransport(frt("v1"), frt("v2")) + + examples := []struct { + label string + protoMajor int + want string + }{ + { + label: "use default transport for HTTP1", + protoMajor: 1, + want: "v1", + }, + { + label: "use h2c transport for HTTP2", + protoMajor: 2, + want: "v2", + }, + { + label: "use default transport for all others", + protoMajor: 99, + want: "v1", + }, + } + + for _, e := range examples { + t.Run(e.label, func(t *testing.T) { + wants[e.want] = false + + r := &http.Request{ProtoMajor: e.protoMajor} + + rt.RoundTrip(r) + + if wants[e.want] != true { + t.Error("Wrong transport selected for request.") + } + }) + } +} + +func TestRetryRoundTripper(t *testing.T) { + req := &http.Request{} + + resp := func(status int) *http.Response { + return &http.Response{StatusCode: status, Body: &spyReadCloser{}} + } + + someErr := errors.New("some error") + conditions := []RetryCond{ + RetryStatus(http.StatusInternalServerError), + RetryStatus(http.StatusBadRequest), + } + + examples := []struct { + label string + resp *http.Response + err error + cond []RetryCond + wantRetry bool + wantBodyClosed bool + }{ + { + label: "no retry", + resp: resp(http.StatusOK), + err: nil, + cond: conditions, + wantRetry: false, + wantBodyClosed: false, + }, + { + label: "no conditions", + resp: resp(http.StatusInternalServerError), + err: nil, + cond: []RetryCond{}, + wantRetry: false, + wantBodyClosed: false, + }, + { + label: "retry on error", + resp: nil, + err: someErr, + cond: conditions, + wantRetry: true, + wantBodyClosed: false, + }, + { + label: "retry on condition 1", + resp: resp(http.StatusInternalServerError), + err: nil, + cond: conditions, + wantRetry: true, + wantBodyClosed: true, + }, + { + label: "retry on condition 2", + resp: resp(http.StatusBadRequest), + err: nil, + cond: conditions, + wantRetry: true, + wantBodyClosed: true, + }, + } + + for _, e := range examples { + t.Run(e.label, func(t *testing.T) { + transport := RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + return e.resp, e.err + }) + + retry := RetryerFunc(func(action ActionFunc) int { + if action() { + if !e.wantRetry { + t.Errorf("Unexpected retry.") + } + } + return 1 + }) + + rt := NewRetryRoundTripper( + transport, + TestLogger(t), + retry, + e.cond..., + ) + + resp, err := rt.RoundTrip(req) + + if resp != e.resp { + t.Errorf("Unexpected response. Want %v, got %v", e.resp, resp) + } + + if err != e.err { + t.Errorf("Unexpected error. Want %v, got %v", e.err, err) + } + + if e.wantBodyClosed && !e.resp.Body.(*spyReadCloser).Closed { + t.Errorf("Expected response body to be closed.") + } + + if resp != nil { + if got, want := resp.Header.Get(activator.ResponseCountHTTPHeader), "1"; got != want { + t.Errorf("Expected retry header not the same got: %q want: %q", got, want) + } + } + }) + } +} + +func TestRetryRoundTripperRewind(t *testing.T) { + retry := RetryerFunc(func(action ActionFunc) int { + action() + action() + return 2 + }) + + rt := NewRetryRoundTripper( + http.DefaultTransport, + TestLogger(t), + retry, + RetryStatus(http.StatusInternalServerError), + ) + + spy := &spyReadCloser{Reader: strings.NewReader("request body")} + req, _ := http.NewRequest("POST", "http://knative.dev/test/", spy) + + rt.RoundTrip(req) + + if spy.ReadAfterClose { + t.Fatal("The retry round tripper read the request body more than once") + } +} diff --git a/pkg/h2c/transport.go b/pkg/h2c/transport.go index 60b544cafc96..0b68f60e696c 100644 --- a/pkg/h2c/transport.go +++ b/pkg/h2c/transport.go @@ -18,4 +18,6 @@ func NewTransport() http.RoundTripper { return net.Dial(netw, addr) }, } -} \ No newline at end of file +} + +var DefaultTransport http.RoundTripper = NewTransport()