Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 41 additions & 47 deletions pkg/queue/sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"net/http"
"os"
"strconv"
"time"

"github.com/kelseyhightower/envconfig"
Expand All @@ -38,7 +37,6 @@ import (
"knative.dev/pkg/logging/logkey"
"knative.dev/pkg/metrics"
pkgnet "knative.dev/pkg/network"
"knative.dev/pkg/profiling"
"knative.dev/pkg/signals"
"knative.dev/pkg/tracing"
tracingconfig "knative.dev/pkg/tracing/config"
Expand Down Expand Up @@ -72,6 +70,9 @@ const (
// QPOptionTokenDirPath is a directory for per audience tokens
// This path is used by QP Options (Extensions) as <QPOptionTokenDirPath>/<Audience>
QPOptionTokenDirPath = queue.TokenDirectory

// shutdownLimit is the maximum time we wait for a server to shutdown after a Kill
shutdownLimit = 5 * time.Second
)

type config struct {
Expand Down Expand Up @@ -157,6 +158,12 @@ type Defaults struct {
Transport http.RoundTripper
}

type httpServer struct {
name string
srv *http.Server
tls bool
}

type Option func(*Defaults)

func init() {
Expand Down Expand Up @@ -236,57 +243,46 @@ func Main(opts ...Option) error {
concurrencyendpoint = queue.NewConcurrencyEndpoint(env.ConcurrencyStateEndpoint, env.ConcurrencyStateTokenPath)
}

var srvs []httpServer

// Enable TLS when certificate is mounted.
tlsEnabled := exists(logger, certPath) && exists(logger, keyPath)
encryptionEnabled := exists(logger, certPath) && exists(logger, keyPath)

mainHandler, drainer := mainHandler(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint)
adminHandler := adminHandler(d.Ctx, logger, drainer)

// Enable TLS server when activator server certs are mounted.
// At this moment activator with TLS does not disable HTTP.
// Start main httpServer regardless if tlsEnabled is true
// See also https://github.com/knative/serving/issues/12808.
httpServers := map[string]*http.Server{
"main": mainServer(":"+env.QueueServingPort, mainHandler),
"admin": adminServer(":"+strconv.Itoa(networking.QueueAdminPort), adminHandler),
"metrics": metricsServer(protoStatReporter),
srvs = append(srvs, buildMainHTTPServer(mainHandler, env, false))
if encryptionEnabled {
// add mainTls on top of the main httpServer
srvs = append(srvs, buildMainHTTPServer(mainHandler, env, true))
}

srvs = append(srvs, buildAdminHTTPServer(adminHandler, encryptionEnabled))
srvs = append(srvs, buildMetricsHTTPServer(protoStatReporter))
if env.EnableProfiling {
httpServers["profile"] = profiling.NewServer(profiling.NewHandler(logger, true))
}

tlsServers := map[string]*http.Server{
"main": mainServer(":"+env.QueueServingTLSPort, mainHandler),
"admin": adminServer(":"+strconv.Itoa(networking.QueueAdminPort), adminHandler),
srvs = append(srvs, buildProfilingHTTPServer(logger))
}

if tlsEnabled {
// Drop admin http server since the admin TLS server is listening on the same port
delete(httpServers, "admin")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like prior admin server supported TLS but now it is only served on HTTP. Not sure if it internal encryption design required this admin server to use TLS only.

cc @nak3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dprotaso no change was made to the server, it was called tls but it was not tls before. It was HTTP and it remains HTTP after this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's listening with s.ListenAndServeTLS here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh... nice catch, missed that, will fix it. Seems like this change was not reflected in my testing nor in any unit or e2e tests...

} else {
tlsServers = map[string]*http.Server{}
}

logger.Info("Starting queue-proxy")

errCh := make(chan error)
for name, server := range httpServers {
go func(name string, s *http.Server) {
for _, s := range srvs {
go func(s httpServer) {
logger.Debugf("Starting httpServer %s on port %s with tls %t", s.name, s.srv.Addr, s.tls)
// Don't forward ErrServerClosed as that indicates we're already shutting down.
logger.Info("Starting http server ", name, s.Addr)
if err := s.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- fmt.Errorf("%s server failed to serve: %w", name, err)
var err error
if s.tls {
err = s.srv.ListenAndServeTLS(certPath, keyPath)
} else {
err = s.srv.ListenAndServe()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it was easier to reason which servers were TLS and which were not prior to this refactor cause we had distinct lists.

Now it's necessary to look at how each server is constructed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a single list helps simplify the code in a number of places and avoids unnecessary code repetition.

}
}(name, server)
}
for name, server := range tlsServers {
go func(name string, s *http.Server) {
// Don't forward ErrServerClosed as that indicates we're already shutting down.
logger.Info("Starting tls server ", name, s.Addr)
if err := s.ListenAndServeTLS(certPath, keyPath); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- fmt.Errorf("%s server failed to serve: %w", name, err)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- fmt.Errorf("%s server failed to serve: %w", s.name, err)
}
}(name, server)
logger.Debugf("Ended httpServer %s on port %s with tls %t", s.name, s.srv.Addr, s.tls)
}(s)
}

// Blocks until we actually receive a TERM signal or one of the servers
Expand All @@ -304,19 +300,17 @@ func Main(opts ...Option) error {
logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration)
drainer.Drain()

for name, srv := range httpServers {
logger.Info("Shutting down server: ", name)
if err := srv.Shutdown(context.Background()); err != nil {
logger.Errorw("Failed to shutdown server", zap.String("server", name), zap.Error(err))
// Shutdown() to all httpServers, wait no more than shutdownLimit for graceful termination
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), shutdownLimit)
defer shutdownRelease()
for _, s := range srvs {
logger.Info("Shutting down server: ", s.name)
if err := s.srv.Shutdown(shutdownCtx); err != nil {
logger.Errorw("Failed to shutdown server", zap.String("server", s.name), zap.Error(err))
}
}
for name, srv := range tlsServers {
logger.Info("Shutting down server: ", name)
if err := srv.Shutdown(context.Background()); err != nil {
logger.Errorw("Failed to shutdown server", zap.String("server", name), zap.Error(err))
}
}
logger.Info("Shutdown complete, exiting...")

logger.Info("Shutdown of all servers is now complete")
}
return nil
}
Expand Down
60 changes: 46 additions & 14 deletions pkg/queue/sharedmain/servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,63 @@ import (
"strconv"
"time"

"go.uber.org/zap"
pkgnet "knative.dev/pkg/network"
"knative.dev/pkg/profiling"
"knative.dev/serving/pkg/networking"
"knative.dev/serving/pkg/queue"
)

func mainServer(addr string, handler http.Handler) *http.Server {
return pkgnet.NewServer(addr, handler)
func buildMainHTTPServer(composedHandler http.Handler, env config, enableTLS bool) httpServer {
var port, name string
if enableTLS {
name = "mainTls"
port = env.QueueServingTLSPort
} else {
name = "main"
port = env.QueueServingPort
}

return httpServer{
name: name,
tls: enableTLS,
srv: pkgnet.NewServer(":"+port, composedHandler),
}
}

func adminServer(addr string, handler http.Handler) *http.Server {
return &http.Server{
Addr: addr,
Handler: handler,
// https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6
ReadHeaderTimeout: time.Minute,
func buildAdminHTTPServer(handler http.Handler, enableTLS bool) httpServer {
return httpServer{
name: "admin",
tls: enableTLS,
srv: &http.Server{
Addr: ":" + strconv.Itoa(networking.QueueAdminPort),
Handler: handler,
//https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6
ReadHeaderTimeout: time.Minute,
},
}
}

func metricsServer(reporter *queue.ProtobufStatsReporter) *http.Server {
func buildMetricsHTTPServer(protobufStatReporter *queue.ProtobufStatsReporter) httpServer {
metricsMux := http.NewServeMux()
metricsMux.Handle("/metrics", queue.NewStatsHandler(reporter))
metricsMux.Handle("/metrics", queue.NewStatsHandler(protobufStatReporter))

return httpServer{
name: "metrics",
tls: false,
srv: &http.Server{
Addr: ":" + strconv.Itoa(networking.AutoscalingQueueMetricsPort),
Handler: metricsMux,
//https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6
ReadHeaderTimeout: time.Minute,
},
}
}

return &http.Server{
Addr: ":" + strconv.Itoa(networking.AutoscalingQueueMetricsPort),
Handler: metricsMux,
ReadHeaderTimeout: time.Minute, //https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6
func buildProfilingHTTPServer(logger *zap.SugaredLogger) httpServer {
return httpServer{
name: "profile",
tls: false,
srv: profiling.NewServer(profiling.NewHandler(logger, true)),
}
}