diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index a98e44ca6a8a..51a714af658d 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -22,7 +22,6 @@ import ( "fmt" "net/http" "os" - "strconv" "time" "github.com/kelseyhightower/envconfig" @@ -38,7 +37,6 @@ import ( "knative.dev/pkg/logging/logkey" "knative.dev/pkg/metrics" pkgnet "knative.dev/pkg/network" - "knative.dev/pkg/profiling" "knative.dev/pkg/signals" "knative.dev/pkg/tracing" tracingconfig "knative.dev/pkg/tracing/config" @@ -72,6 +70,9 @@ const ( // QPOptionTokenDirPath is a directory for per audience tokens // This path is used by QP Options (Extensions) as / QPOptionTokenDirPath = queue.TokenDirectory + + // shutdownLimit is the maximum time we wait for a server to shutdown after a Kill + shutdownLimit = 5 * time.Second ) type config struct { @@ -157,6 +158,12 @@ type Defaults struct { Transport http.RoundTripper } +type httpServer struct { + name string + srv *http.Server + tls bool +} + type Option func(*Defaults) func init() { @@ -236,57 +243,46 @@ func Main(opts ...Option) error { concurrencyendpoint = queue.NewConcurrencyEndpoint(env.ConcurrencyStateEndpoint, env.ConcurrencyStateTokenPath) } + var srvs []httpServer + // Enable TLS when certificate is mounted. - tlsEnabled := exists(logger, certPath) && exists(logger, keyPath) + encryptionEnabled := exists(logger, certPath) && exists(logger, keyPath) mainHandler, drainer := mainHandler(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint) adminHandler := adminHandler(d.Ctx, logger, drainer) // Enable TLS server when activator server certs are mounted. // At this moment activator with TLS does not disable HTTP. + // Start main httpServer regardless if tlsEnabled is true // See also https://github.com/knative/serving/issues/12808. - httpServers := map[string]*http.Server{ - "main": mainServer(":"+env.QueueServingPort, mainHandler), - "admin": adminServer(":"+strconv.Itoa(networking.QueueAdminPort), adminHandler), - "metrics": metricsServer(protoStatReporter), + srvs = append(srvs, buildMainHTTPServer(mainHandler, env, false)) + if encryptionEnabled { + // add mainTls on top of the main httpServer + srvs = append(srvs, buildMainHTTPServer(mainHandler, env, true)) } - + srvs = append(srvs, buildAdminHTTPServer(adminHandler, encryptionEnabled)) + srvs = append(srvs, buildMetricsHTTPServer(protoStatReporter)) if env.EnableProfiling { - httpServers["profile"] = profiling.NewServer(profiling.NewHandler(logger, true)) - } - - tlsServers := map[string]*http.Server{ - "main": mainServer(":"+env.QueueServingTLSPort, mainHandler), - "admin": adminServer(":"+strconv.Itoa(networking.QueueAdminPort), adminHandler), + srvs = append(srvs, buildProfilingHTTPServer(logger)) } - - if tlsEnabled { - // Drop admin http server since the admin TLS server is listening on the same port - delete(httpServers, "admin") - } else { - tlsServers = map[string]*http.Server{} - } - logger.Info("Starting queue-proxy") errCh := make(chan error) - for name, server := range httpServers { - go func(name string, s *http.Server) { + for _, s := range srvs { + go func(s httpServer) { + logger.Debugf("Starting httpServer %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) // Don't forward ErrServerClosed as that indicates we're already shutting down. - logger.Info("Starting http server ", name, s.Addr) - if err := s.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - errCh <- fmt.Errorf("%s server failed to serve: %w", name, err) + var err error + if s.tls { + err = s.srv.ListenAndServeTLS(certPath, keyPath) + } else { + err = s.srv.ListenAndServe() } - }(name, server) - } - for name, server := range tlsServers { - go func(name string, s *http.Server) { - // Don't forward ErrServerClosed as that indicates we're already shutting down. - logger.Info("Starting tls server ", name, s.Addr) - if err := s.ListenAndServeTLS(certPath, keyPath); err != nil && !errors.Is(err, http.ErrServerClosed) { - errCh <- fmt.Errorf("%s server failed to serve: %w", name, err) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + errCh <- fmt.Errorf("%s server failed to serve: %w", s.name, err) } - }(name, server) + logger.Debugf("Ended httpServer %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) + }(s) } // Blocks until we actually receive a TERM signal or one of the servers @@ -304,19 +300,17 @@ func Main(opts ...Option) error { logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) drainer.Drain() - for name, srv := range httpServers { - logger.Info("Shutting down server: ", name) - if err := srv.Shutdown(context.Background()); err != nil { - logger.Errorw("Failed to shutdown server", zap.String("server", name), zap.Error(err)) + // Shutdown() to all httpServers, wait no more than shutdownLimit for graceful termination + shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), shutdownLimit) + defer shutdownRelease() + for _, s := range srvs { + logger.Info("Shutting down server: ", s.name) + if err := s.srv.Shutdown(shutdownCtx); err != nil { + logger.Errorw("Failed to shutdown server", zap.String("server", s.name), zap.Error(err)) } } - for name, srv := range tlsServers { - logger.Info("Shutting down server: ", name) - if err := srv.Shutdown(context.Background()); err != nil { - logger.Errorw("Failed to shutdown server", zap.String("server", name), zap.Error(err)) - } - } - logger.Info("Shutdown complete, exiting...") + + logger.Info("Shutdown of all servers is now complete") } return nil } diff --git a/pkg/queue/sharedmain/servers.go b/pkg/queue/sharedmain/servers.go index d8e1fe9a3486..a6d638743b2c 100644 --- a/pkg/queue/sharedmain/servers.go +++ b/pkg/queue/sharedmain/servers.go @@ -21,31 +21,63 @@ import ( "strconv" "time" + "go.uber.org/zap" pkgnet "knative.dev/pkg/network" + "knative.dev/pkg/profiling" "knative.dev/serving/pkg/networking" "knative.dev/serving/pkg/queue" ) -func mainServer(addr string, handler http.Handler) *http.Server { - return pkgnet.NewServer(addr, handler) +func buildMainHTTPServer(composedHandler http.Handler, env config, enableTLS bool) httpServer { + var port, name string + if enableTLS { + name = "mainTls" + port = env.QueueServingTLSPort + } else { + name = "main" + port = env.QueueServingPort + } + + return httpServer{ + name: name, + tls: enableTLS, + srv: pkgnet.NewServer(":"+port, composedHandler), + } } -func adminServer(addr string, handler http.Handler) *http.Server { - return &http.Server{ - Addr: addr, - Handler: handler, - // https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6 - ReadHeaderTimeout: time.Minute, +func buildAdminHTTPServer(handler http.Handler, enableTLS bool) httpServer { + return httpServer{ + name: "admin", + tls: enableTLS, + srv: &http.Server{ + Addr: ":" + strconv.Itoa(networking.QueueAdminPort), + Handler: handler, + //https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6 + ReadHeaderTimeout: time.Minute, + }, } } -func metricsServer(reporter *queue.ProtobufStatsReporter) *http.Server { +func buildMetricsHTTPServer(protobufStatReporter *queue.ProtobufStatsReporter) httpServer { metricsMux := http.NewServeMux() - metricsMux.Handle("/metrics", queue.NewStatsHandler(reporter)) + metricsMux.Handle("/metrics", queue.NewStatsHandler(protobufStatReporter)) + + return httpServer{ + name: "metrics", + tls: false, + srv: &http.Server{ + Addr: ":" + strconv.Itoa(networking.AutoscalingQueueMetricsPort), + Handler: metricsMux, + //https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6 + ReadHeaderTimeout: time.Minute, + }, + } +} - return &http.Server{ - Addr: ":" + strconv.Itoa(networking.AutoscalingQueueMetricsPort), - Handler: metricsMux, - ReadHeaderTimeout: time.Minute, //https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6 +func buildProfilingHTTPServer(logger *zap.SugaredLogger) httpServer { + return httpServer{ + name: "profile", + tls: false, + srv: profiling.NewServer(profiling.NewHandler(logger, true)), } }