Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 50 additions & 15 deletions cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"github.com/knative/serving/pkg/logging"
"github.com/knative/serving/pkg/system"
"github.com/knative/serving/third_party/h2c"
"go.opencensus.io/exporter/prometheus"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -52,18 +54,21 @@ const (
)

type activationHandler struct {
act activator.Activator
logger *zap.SugaredLogger
act activator.Activator
logger *zap.SugaredLogger
reporter activator.StatsReporter
}

// retryRoundTripper retries on 503's for up to 60 seconds. The reason is there is
// a small delay for k8s to include the ready IP in service.
// https://github.com/knative/serving/issues/660#issuecomment-384062553
type retryRoundTripper struct {
logger *zap.SugaredLogger
logger *zap.SugaredLogger
reporter activator.StatsReporter
start time.Time
}

func (rrt retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
var err error
var reqBody *bytes.Reader

Expand Down Expand Up @@ -113,35 +118,49 @@ func (rrt retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error)

resp, err = transport.RoundTrip(r)
}
// TODO: add metrics for number of tries and the response code.

if resp != nil {
rrt.logger.Infof("It took %d tries to get response code %d", i, resp.StatusCode)
namespace := r.Header.Get(controller.GetRevisionHeaderNamespace())
name := r.Header.Get(controller.GetRevisionHeaderName())
config := r.Header.Get(controller.GetConfigurationHeader())
rrt.reporter.ReportResponseCount(namespace, config, name, resp.StatusCode, i, 1.0)
rrt.reporter.ReportResponseTime(namespace, config, name, resp.StatusCode, time.Now().Sub(rrt.start))
}
return resp, err
}

func (a *activationHandler) handler(w http.ResponseWriter, r *http.Request) {
namespace := r.Header.Get(controller.GetRevisionHeaderNamespace())
name := r.Header.Get(controller.GetRevisionHeaderName())
config := r.Header.Get(controller.GetConfigurationHeader())
start := time.Now()

if r.ContentLength > maxUploadBytes {
w.WriteHeader(http.StatusRequestEntityTooLarge)
a.reporter.ReportResponseCount(namespace, config, name, http.StatusRequestEntityTooLarge, 1, 1.0)
a.reporter.ReportResponseTime(namespace, config, name, http.StatusRequestEntityTooLarge, time.Now().Sub(start))
return
}

namespace := r.Header.Get(controller.GetRevisionHeaderNamespace())
name := r.Header.Get(controller.GetRevisionHeaderName())
endpoint, status, err := a.act.ActiveEndpoint(namespace, name)
endpoint, status, err := a.act.ActiveEndpoint(namespace, config, name)
if err != nil {
msg := fmt.Sprintf("Error getting active endpoint: %v", err)
a.logger.Error(msg)
http.Error(w, msg, int(status))
a.logger.Errorf(msg)
a.reporter.ReportResponseCount(namespace, config, name, int(status), 1, 1.0)
a.reporter.ReportResponseTime(namespace, config, name, int(status), time.Now().Sub(start))
return
}
target := &url.URL{
Scheme: "http",
Host: fmt.Sprintf("%s:%d", endpoint.FQDN, endpoint.Port),
}
proxy := httputil.NewSingleHostReverseProxy(target)
proxy.Transport = retryRoundTripper{
logger: a.logger,
proxy.Transport = &retryRoundTripper{
logger: a.logger,
reporter: a.reporter,
start: start,
}
proxy.ServeHTTP(w, r)
}
Expand Down Expand Up @@ -174,9 +193,22 @@ func main() {
logger.Fatal("Error building serving clientset: %v", zap.Error(err))
}

a := activator.NewRevisionActivator(kubeClient, servingClient, logger)
logger.Info("Initializing OpenCensus Prometheus exporter.")
promExporter, err := prometheus.NewExporter(prometheus.Options{Namespace: "activator"})
if err != nil {
logger.Fatal("Failed to create the Prometheus exporter: %v", zap.Error(err))
}
view.RegisterExporter(promExporter)
view.SetReportingPeriod(10 * time.Second)

reporter, err := activator.NewStatsReporter()
if err != nil {
logger.Fatal("Failed to create stats reporter: %v", zap.Error(err))
}

a := activator.NewRevisionActivator(kubeClient, servingClient, logger, reporter)
a = activator.NewDedupingActivator(a)
ah := &activationHandler{a, logger}
ah := &activationHandler{a, logger, reporter}

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
Expand All @@ -192,6 +224,9 @@ func main() {
logger.Fatalf("failed to start configuration manager: %v", err)
}

http.HandleFunc("/", ah.handler)
h2c.ListenAndServe(":8080", nil)
// Start the endpoint for Prometheus scraping
mux := http.NewServeMux()
mux.HandleFunc("/", ah.handler)
mux.Handle("/metrics", promExporter)
h2c.ListenAndServe(":8080", mux)
}
2 changes: 1 addition & 1 deletion config/activator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ spec:
# and substituted here.
image: github.com/knative/serving/cmd/activator
ports:
- name: http
- name: activator-port
containerPort: 8080
args:
# Disable glog writing into stderr. Our code doesn't use glog
Expand Down
Loading