Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 49 additions & 15 deletions cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
h2cutil "github.com/knative/serving/pkg/h2c"
"github.com/knative/serving/pkg/logging"
"github.com/knative/serving/pkg/signals"
"github.com/knative/serving/third_party/h2c"
"go.opencensus.io/exporter/prometheus"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -45,15 +46,18 @@ const (
)

type activationHandler struct {
act activator.Activator
logger *zap.SugaredLogger
act activator.Activator
logger *zap.SugaredLogger
reporter activator.StatsReporter
}

// retryRoundTripper retries on 503's for up to 60 seconds. The reason is there is
// a small delay for k8s to include the ready IP in service.
// https://github.com/knative/serving/issues/660#issuecomment-384062553
type retryRoundTripper struct {
logger *zap.SugaredLogger
logger *zap.SugaredLogger
reporter activator.StatsReporter
start time.Time
}

func (rrt retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface implementation should be on the pointer, not the value type.

func (rrt *retryRoundTripper) RoundTrip(...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Expand Down Expand Up @@ -106,26 +110,38 @@ func (rrt retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error)

resp, err = transport.RoundTrip(r)
}
// TODO: add metrics for number of tries and the response code.

if resp != nil {
rrt.logger.Infof("It took %d tries to get response code %d", i, resp.StatusCode)
namespace := r.Header.Get(controller.GetRevisionHeaderNamespace())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is these are empty? Should we still send metrics?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/s/is/if/

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's empty or doesn't exist, we'll get 500 errors based on

revision, err := revisionClient.Get(rev.name, metav1.GetOptions{})

I think we should report error for the given headers -- all requests routed to activator should have the headers set. If not, we can use the metrics to debug.

name := r.Header.Get(controller.GetRevisionHeaderName())
config := r.Header.Get(controller.GetConfigurationHeader())
rrt.reporter.ReportResponseCount(namespace, config, name, resp.StatusCode, i, 1.0)
rrt.reporter.ReportResponseTime(namespace, config, name, resp.StatusCode, time.Now().Sub(rrt.start))
}
return resp, err
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not report anything when all retries fail with an error (i.e. no response)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can do that. In that case, what status code dimension you would suggest to use?

}

func (a *activationHandler) handler(w http.ResponseWriter, r *http.Request) {
namespace := r.Header.Get(controller.GetRevisionHeaderNamespace())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of this new functionality in main.go have any unit tests - see #1585 that refactoring and adding unit tests. I would prefer waiting for that to get in and adding this functionality such that it is tested properly.

All of these should be easy to test - inject headers, test edge cases, see if the reporter is getting called, ...etc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that pull request #1585 takes long time, I think we can get activator dashboard in first and then add unit tests based on #1585 later.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note, I moved #1585 to #1689

name := r.Header.Get(controller.GetRevisionHeaderName())
config := r.Header.Get(controller.GetConfigurationHeader())
start := time.Now()

if r.ContentLength > maxUploadBytes {
w.WriteHeader(http.StatusRequestEntityTooLarge)
a.reporter.ReportResponseCount(namespace, config, name, http.StatusRequestEntityTooLarge, 1, 1.0)
a.reporter.ReportResponseTime(namespace, config, name, http.StatusRequestEntityTooLarge, time.Now().Sub(start))
return
}

namespace := r.Header.Get(controller.GetRevisionHeaderNamespace())
name := r.Header.Get(controller.GetRevisionHeaderName())
endpoint, status, err := a.act.ActiveEndpoint(namespace, name)
endpoint, status, err := a.act.ActiveEndpoint(namespace, config, name)
if err != nil {
msg := fmt.Sprintf("Error getting active endpoint: %v", err)
a.logger.Errorf(msg)
http.Error(w, msg, int(status))
a.logger.Errorf(msg)
a.reporter.ReportResponseCount(namespace, config, name, int(status), 1, 1.0)
a.reporter.ReportResponseTime(namespace, config, name, int(status), time.Now().Sub(start))
return
}
target := &url.URL{
Expand All @@ -134,7 +150,9 @@ func (a *activationHandler) handler(w http.ResponseWriter, r *http.Request) {
}
proxy := httputil.NewSingleHostReverseProxy(target)
proxy.Transport = retryRoundTripper{
logger: a.logger,
logger: a.logger,
reporter: a.reporter,
start: start,
}

// TODO: Clear the host to avoid 404's.
Expand Down Expand Up @@ -168,9 +186,22 @@ func main() {
logger.Fatal("Error building serving clientset: %v", zap.Error(err))
}

a := activator.NewRevisionActivator(kubeClient, servingClient, logger)
a = activator.NewDedupingActivator(a)
ah := &activationHandler{a, logger}
logger.Info("Initializing OpenCensus Prometheus exporter.")
promExporter, err := prometheus.NewExporter(prometheus.Options{Namespace: "activator"})
if err != nil {
logger.Fatal("Failed to create the Prometheus exporter: %v", zap.Error(err))
}
view.RegisterExporter(promExporter)
view.SetReportingPeriod(10 * time.Second)

reporter, err := activator.NewStatsReporter()
if err != nil {
logger.Fatal("Failed to create stats reporter: %v", zap.Error(err))
}

a := activator.NewRevisionActivator(kubeClient, servingClient, logger, reporter)
a = activator.NewDedupingActivator(a, servingClient, logger, reporter)
ah := &activationHandler{a, logger, reporter}

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
Expand All @@ -179,6 +210,9 @@ func main() {
a.Shutdown()
}()

http.HandleFunc("/", ah.handler)
h2c.ListenAndServe(":8080", nil)
// Start the endpoint for Prometheus scraping
mux := http.NewServeMux()
mux.HandleFunc("/", ah.handler)
mux.Handle("/metrics", promExporter)
http.ListenAndServe(":8080", mux)
}
2 changes: 1 addition & 1 deletion config/activator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ spec:
# and substituted here.
image: github.com/knative/serving/cmd/activator
ports:
- name: http
- name: activator-port
containerPort: 8080
args:
- "-logtostderr=true"
Expand Down
Loading