Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 25 additions & 123 deletions cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,21 @@ limitations under the License.
package main

import (
"bytes"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/http/httputil"
"net/url"
"time"

"github.com/knative/pkg/logging/logkey"

"github.com/knative/pkg/configmap"
"github.com/knative/pkg/signals"
"github.com/knative/serving/pkg/activator"
activatorhandler "github.com/knative/serving/pkg/activator/handler"
activatorutil "github.com/knative/serving/pkg/activator/util"
clientset "github.com/knative/serving/pkg/client/clientset/versioned"
h2cutil "github.com/knative/serving/pkg/h2c"

"github.com/knative/serving/pkg/logging"
"github.com/knative/serving/pkg/reconciler"
"github.com/knative/serving/pkg/system"
"github.com/knative/serving/third_party/h2c"
"go.opencensus.io/exporter/prometheus"
Expand All @@ -48,123 +43,11 @@ import (

const (
maxUploadBytes = 32e6 // 32MB - same as app engine
maxRetry = 60
maxRetries = 60
retryInterval = 1 * time.Second
logLevelKey = "activator"
)

type activationHandler struct {
act activator.Activator
logger *zap.SugaredLogger
reporter activator.StatsReporter
}

// retryRoundTripper retries on 503's for up to 60 seconds. The reason is there is
// a small delay for k8s to include the ready IP in service.
// https://github.com/knative/serving/issues/660#issuecomment-384062553
type retryRoundTripper struct {
logger *zap.SugaredLogger
reporter activator.StatsReporter
start time.Time
}

func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
var err error
var reqBody *bytes.Reader

transport := http.DefaultTransport

if r.ProtoMajor == 2 {
transport = h2cutil.NewTransport()
}

if r.Body != nil {
reqBytes, err := ioutil.ReadAll(r.Body)

if err != nil {
rrt.logger.Errorf("Error reading request body: %s", err)
return nil, err
}

reqBody = bytes.NewReader(reqBytes)
r.Body = ioutil.NopCloser(reqBody)
}

resp, err := transport.RoundTrip(r)
// TODO: Activator should retry with backoff.
// https://github.com/knative/serving/issues/1229
i := 1
for ; i < maxRetry; i++ {
if err == nil && resp != nil && resp.StatusCode != 503 {
break
}

if err != nil {
rrt.logger.Errorf("Error making a request: %s", err)
}

if resp != nil {
resp.Body.Close()
}

time.Sleep(retryInterval)

// The request body cannot be read multiple times for retries.
// The workaround is to clone the request body into a byte reader
// so the body can be read multiple times.
if r.Body != nil {
reqBody.Seek(0, io.SeekStart)
}

resp, err = transport.RoundTrip(r)
}

if resp != nil {
rrt.logger.Infof("It took %d tries to get response code %d", i, resp.StatusCode)
namespace := r.Header.Get(reconciler.GetRevisionHeaderNamespace())
name := r.Header.Get(reconciler.GetRevisionHeaderName())
config := r.Header.Get(reconciler.GetConfigurationHeader())
rrt.reporter.ReportResponseCount(namespace, config, name, resp.StatusCode, i, 1.0)
rrt.reporter.ReportResponseTime(namespace, config, name, resp.StatusCode, time.Now().Sub(rrt.start))
}
return resp, err
}

func (a *activationHandler) handler(w http.ResponseWriter, r *http.Request) {
namespace := r.Header.Get(reconciler.GetRevisionHeaderNamespace())
name := r.Header.Get(reconciler.GetRevisionHeaderName())
config := r.Header.Get(reconciler.GetConfigurationHeader())
start := time.Now()

if r.ContentLength > maxUploadBytes {
w.WriteHeader(http.StatusRequestEntityTooLarge)
a.reporter.ReportResponseCount(namespace, config, name, http.StatusRequestEntityTooLarge, 1, 1.0)
a.reporter.ReportResponseTime(namespace, config, name, http.StatusRequestEntityTooLarge, time.Now().Sub(start))
return
}

endpoint, status, err := a.act.ActiveEndpoint(namespace, config, name)
if err != nil {
msg := fmt.Sprintf("Error getting active endpoint: %v", err)
http.Error(w, msg, int(status))
a.logger.Errorf(msg)
a.reporter.ReportResponseCount(namespace, config, name, int(status), 1, 1.0)
a.reporter.ReportResponseTime(namespace, config, name, int(status), time.Now().Sub(start))
return
}
target := &url.URL{
Scheme: "http",
Host: fmt.Sprintf("%s:%d", endpoint.FQDN, endpoint.Port),
}
proxy := httputil.NewSingleHostReverseProxy(target)
proxy.Transport = &retryRoundTripper{
logger: a.logger,
reporter: a.reporter,
start: start,
}
proxy.ServeHTTP(w, r)
}

func main() {
flag.Parse()
cm, err := configmap.Load("/etc/config-logging")
Expand Down Expand Up @@ -208,7 +91,26 @@ func main() {

a := activator.NewRevisionActivator(kubeClient, servingClient, logger, reporter)
a = activator.NewDedupingActivator(a)
ah := &activationHandler{a, logger, reporter}

// Retry on 503's for up to 60 seconds. The reason is there is
// a small delay for k8s to include the ready IP in service.
// https://github.com/knative/serving/issues/660#issuecomment-384062553
shouldRetry := activatorutil.RetryStatus(http.StatusServiceUnavailable)
retryer := activatorutil.NewLinearRetryer(retryInterval, maxRetries)

rt := activatorutil.NewRetryRoundTripper(activatorutil.AutoTransport, logger, retryer, shouldRetry)

ah := &activatorhandler.ReportingHTTPHandler{
Reporter: reporter,
NextHandler: &activatorhandler.EnforceMaxContentLengthHandler{
MaxContentLengthBytes: maxUploadBytes,
NextHandler: &activatorhandler.ActivationHandler{
Activator: a,
Transport: rt,
Logger: logger,
},
},
}

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
Expand All @@ -226,7 +128,7 @@ func main() {

// Start the endpoint for Prometheus scraping
mux := http.NewServeMux()
mux.HandleFunc("/", ah.handler)
mux.HandleFunc("/", ah.ServeHTTP)
mux.Handle("/metrics", promExporter)
h2c.ListenAndServe(":8080", mux)
}
2 changes: 1 addition & 1 deletion cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func main() {

httpProxy = httputil.NewSingleHostReverseProxy(target)
h2cProxy = httputil.NewSingleHostReverseProxy(target)
h2cProxy.Transport = h2cutil.NewTransport()
h2cProxy.Transport = h2cutil.DefaultTransport

logger.Infof("Queue container is starting, concurrencyModel: %s", *concurrencyModel)
config, err := rest.InClusterConfig()
Expand Down
3 changes: 2 additions & 1 deletion pkg/activator/activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ package activator

const (
// The name of the activator service.
K8sServiceName = "activator-service"
K8sServiceName = "activator-service"
ResponseCountHTTPHeader = "X-Activator-Num-Retries"
)

// Status is an HTTP status code.
Expand Down
12 changes: 12 additions & 0 deletions pkg/activator/handler/coverage.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
mode: set
github.com/knative/serving/pkg/activator/handler/enforce_length_handler.go:26.92,27.47 1 1
github.com/knative/serving/pkg/activator/handler/enforce_length_handler.go:32.2,32.31 1 1
github.com/knative/serving/pkg/activator/handler/enforce_length_handler.go:27.47,30.3 2 1
github.com/knative/serving/pkg/activator/handler/handler.go:35.90,37.2 1 1
github.com/knative/serving/pkg/activator/handler/handler.go:39.79,46.16 5 1
github.com/knative/serving/pkg/activator/handler/handler.go:54.2,66.23 5 1
github.com/knative/serving/pkg/activator/handler/handler.go:46.16,52.3 4 1
github.com/knative/serving/pkg/activator/handler/reporting_handler.go:30.82,46.87 9 1
github.com/knative/serving/pkg/activator/handler/reporting_handler.go:51.2,51.79 1 1
github.com/knative/serving/pkg/activator/handler/reporting_handler.go:46.87,49.3 2 1
github.com/knative/serving/pkg/activator/handler/reporting_handler.go:59.53,63.2 2 1
33 changes: 33 additions & 0 deletions pkg/activator/handler/enforce_length_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package handler

import (
"net/http"
)

// EnforceMaxContentLengthHandler prevents uploads larger than `MaxContentLengthBytes`
type EnforceMaxContentLengthHandler struct {
NextHandler http.Handler
MaxContentLengthBytes int64
}

func (h *EnforceMaxContentLengthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.ContentLength > h.MaxContentLengthBytes {
w.WriteHeader(http.StatusRequestEntityTooLarge)
return
}

h.NextHandler.ServeHTTP(w, r)
}
64 changes: 64 additions & 0 deletions pkg/activator/handler/enforce_length_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package handler

import (
"bytes"
"net/http"
"net/http/httptest"
"testing"
)

func TestEnforceMaxContentLengthHandler(t *testing.T) {
payload := "SAMPLE PAYLOAD"

examples := []struct {
label string
maxUpload int
status int
}{
{
label: "under",
maxUpload: len(payload) + 1,
status: http.StatusOK,
},
{
label: "equal",
maxUpload: len(payload),
status: http.StatusOK,
},
{
label: "over",
maxUpload: len(payload) - 1,
status: http.StatusRequestEntityTooLarge,
},
}

for _, e := range examples {
t.Run(e.label, func(t *testing.T) {
baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
handler := EnforceMaxContentLengthHandler{NextHandler: baseHandler, MaxContentLengthBytes: int64(e.maxUpload)}

resp := httptest.NewRecorder()
req := httptest.NewRequest("POST", "http://example.com", bytes.NewBufferString(payload))

handler.ServeHTTP(resp, req)

if resp.Code != e.status {
t.Errorf("Unexpected response status for payload %q. Want %d, got %d", payload, e.status, resp.Code)
}
})
}
}
63 changes: 63 additions & 0 deletions pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package handler

import (
"fmt"
"net/http"
"net/http/httputil"
"net/url"

"github.com/knative/serving/pkg/activator"
"github.com/knative/serving/pkg/reconciler"
"go.uber.org/zap"
)

// ActivationHandler will wait for an active endpoint for a revision
// to be available before proxing the request
type ActivationHandler struct {
Activator activator.Activator
Logger *zap.SugaredLogger
Transport http.RoundTripper
}

func (a *ActivationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
namespace := r.Header.Get(reconciler.GetRevisionHeaderNamespace())
name := r.Header.Get(reconciler.GetRevisionHeaderName())
config := r.Header.Get(reconciler.GetConfigurationHeader())

endpoint, status, err := a.Activator.ActiveEndpoint(namespace, config, name)

if err != nil {
msg := fmt.Sprintf("Error getting active endpoint: %v", err)

a.Logger.Errorf(msg)
http.Error(w, msg, int(status))
return
}

target := &url.URL{
Scheme: "http",
Host: fmt.Sprintf("%s:%d", endpoint.FQDN, endpoint.Port),
}

proxy := httputil.NewSingleHostReverseProxy(target)
proxy.Transport = a.Transport

// TODO: Clear the host to avoid 404's.
// https://github.com/knative/serving/issues/964
r.Host = ""

proxy.ServeHTTP(w, r)
}
Loading