From cb1ad3b5b97114e30f76242c1f68171b1716cc5f Mon Sep 17 00:00:00 2001 From: David Hadas Date: Tue, 21 Feb 2023 13:07:50 +0200 Subject: [PATCH 01/15] Fix initializations and terminations of http servers Signed-off-by: David Hadas --- pkg/queue/sharedmain/main.go | 80 ++++++++++++++++++------------------ 1 file changed, 39 insertions(+), 41 deletions(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index 4c05d9e2fb0b..e7c2cf1a6417 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -243,52 +243,51 @@ func Main(opts ...Option) error { concurrencyendpoint = queue.NewConcurrencyEndpoint(env.ConcurrencyStateEndpoint, env.ConcurrencyStateTokenPath) } + type service struct { + name string + srv *http.Server + tls bool + } + + var srvs []service + var drainer *pkghandler.Drainer + var mainServer *http.Server + // Enable TLS when certificate is mounted. tlsEnabled := exists(logger, certPath) && exists(logger, keyPath) - mainServer, drainer := buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) - httpServers := map[string]*http.Server{ - "main": mainServer, - "metrics": buildMetricsServer(protoStatReporter), - "admin": buildAdminServer(d.Ctx, logger, drainer), - } - if env.EnableProfiling { - httpServers["profile"] = profiling.NewServer(profiling.NewHandler(logger, true)) - } - - // Enable TLS server when activator server certs are mounted. - // At this moment activator with TLS does not disable HTTP. - // See also https://github.com/knative/serving/issues/12808. - var tlsServers map[string]*http.Server if tlsEnabled { - mainTLSServer, drainer := buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true /* enable TLS */) - tlsServers = map[string]*http.Server{ - "tlsMain": mainTLSServer, - "tlsAdmin": buildAdminServer(d.Ctx, logger, drainer), - } - // Drop admin http server as we Use TLS for the admin server. - // TODO: The drain created with mainServer above is lost. Unify the two drain. - delete(httpServers, "admin") + mainServer, drainer = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) + srvs = append(srvs, service{name: "tlsMain", srv: mainServer, tls: true}) + srvs = append(srvs, service{name: "tlsAdmin", srv: buildAdminServer(d.Ctx, logger, drainer), tls: true}) + } else { + mainServer, drainer = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) + srvs = append(srvs, service{name: "main", srv: mainServer, tls: false}) + srvs = append(srvs, service{name: "admin", srv: buildAdminServer(d.Ctx, logger, drainer), tls: false}) + } + srvs = append(srvs, service{name: "metrics", srv: buildMetricsServer(protoStatReporter), tls: false}) + if env.EnableProfiling { + srvs = append(srvs, service{name: "profile", srv: profiling.NewServer(profiling.NewHandler(logger, true)), tls: false}) } logger.Info("Starting queue-proxy") errCh := make(chan error) - for name, server := range httpServers { - go func(name string, s *http.Server) { + for _, s := range srvs { + go func(s service) { + logger.Debugf("Starting service %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) // Don't forward ErrServerClosed as that indicates we're already shutting down. - if err := s.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - errCh <- fmt.Errorf("%s server failed to serve: %w", name, err) + var err error + if s.tls { + err = s.srv.ListenAndServeTLS(certPath, keyPath) + } else { + err = s.srv.ListenAndServe() } - }(name, server) - } - for name, server := range tlsServers { - go func(name string, s *http.Server) { - // Don't forward ErrServerClosed as that indicates we're already shutting down. - if err := s.ListenAndServeTLS(certPath, keyPath); err != nil && !errors.Is(err, http.ErrServerClosed) { - errCh <- fmt.Errorf("%s server failed to serve: %w", name, err) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + errCh <- fmt.Errorf("%s server failed to serve: %w", s.name, err) } - }(name, server) + logger.Debugf("Ended service %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) + }(s) } // Blocks until we actually receive a TERM signal or one of the servers @@ -306,13 +305,12 @@ func Main(opts ...Option) error { logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) drainer.Drain() - // Removing the main server from the shutdown logic as we've already shut it down. - delete(httpServers, "main") - - for serverName, srv := range httpServers { - logger.Info("Shutting down server: ", serverName) - if err := srv.Shutdown(context.Background()); err != nil { - logger.Errorw("Failed to shutdown server", zap.String("server", serverName), zap.Error(err)) + shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownRelease() + for _, s := range srvs { + logger.Info("Shutting down server: ", s.name) + if err := s.srv.Shutdown(shutdownCtx); err != nil { + logger.Errorw("Failed to shutdown server", zap.String("server", s.name), zap.Error(err)) } } logger.Info("Shutdown complete, exiting...") From f68fa708ef200a3c02cb22749cf3d3905fa7d4d8 Mon Sep 17 00:00:00 2001 From: David Hadas Date: Tue, 21 Feb 2023 16:31:05 +0200 Subject: [PATCH 02/15] Adding admin even when tlsAdmin is added Signed-off-by: David Hadas --- pkg/queue/sharedmain/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index e7c2cf1a6417..34909982d1d7 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -263,8 +263,8 @@ func Main(opts ...Option) error { } else { mainServer, drainer = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) srvs = append(srvs, service{name: "main", srv: mainServer, tls: false}) - srvs = append(srvs, service{name: "admin", srv: buildAdminServer(d.Ctx, logger, drainer), tls: false}) } + srvs = append(srvs, service{name: "admin", srv: buildAdminServer(d.Ctx, logger, drainer), tls: false}) srvs = append(srvs, service{name: "metrics", srv: buildMetricsServer(protoStatReporter), tls: false}) if env.EnableProfiling { srvs = append(srvs, service{name: "profile", srv: profiling.NewServer(profiling.NewHandler(logger, true)), tls: false}) From f7b1cb1325c5fa39d126c51a706627d6cca9dcd2 Mon Sep 17 00:00:00 2001 From: David Hadas Date: Tue, 21 Feb 2023 17:49:54 +0200 Subject: [PATCH 03/15] Try out with shutdown to see if e2e kurier-tls pass Signed-off-by: David Hadas --- pkg/queue/sharedmain/main.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index 34909982d1d7..fce827ab6ac1 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -305,14 +305,16 @@ func Main(opts ...Option) error { logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) drainer.Drain() - shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 10*time.Second) - defer shutdownRelease() - for _, s := range srvs { - logger.Info("Shutting down server: ", s.name) - if err := s.srv.Shutdown(shutdownCtx); err != nil { - logger.Errorw("Failed to shutdown server", zap.String("server", s.name), zap.Error(err)) + /* + shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownRelease() + for _, s := range srvs { + logger.Info("Shutting down server: ", s.name) + if err := s.srv.Shutdown(shutdownCtx); err != nil { + logger.Errorw("Failed to shutdown server", zap.String("server", s.name), zap.Error(err)) + } } - } + */ logger.Info("Shutdown complete, exiting...") } return nil From 2b03c1bbea1f9bfe458b12ca085933ae7a1409b2 Mon Sep 17 00:00:00 2001 From: David Hadas Date: Tue, 21 Feb 2023 18:34:13 +0200 Subject: [PATCH 04/15] Seperate Drainers trying to get kurier tls e2e to pass Signed-off-by: David Hadas --- pkg/queue/sharedmain/main.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index fce827ab6ac1..1e8b5df077cf 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -256,12 +256,13 @@ func Main(opts ...Option) error { // Enable TLS when certificate is mounted. tlsEnabled := exists(logger, certPath) && exists(logger, keyPath) + mainServer, drainer = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) if tlsEnabled { - mainServer, drainer = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) + var drainerTls *pkghandler.Drainer + mainServer, drainerTls = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) srvs = append(srvs, service{name: "tlsMain", srv: mainServer, tls: true}) - srvs = append(srvs, service{name: "tlsAdmin", srv: buildAdminServer(d.Ctx, logger, drainer), tls: true}) + srvs = append(srvs, service{name: "tlsAdmin", srv: buildAdminServer(d.Ctx, logger, drainerTls), tls: true}) } else { - mainServer, drainer = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) srvs = append(srvs, service{name: "main", srv: mainServer, tls: false}) } srvs = append(srvs, service{name: "admin", srv: buildAdminServer(d.Ctx, logger, drainer), tls: false}) @@ -305,16 +306,15 @@ func Main(opts ...Option) error { logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) drainer.Drain() - /* - shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 10*time.Second) - defer shutdownRelease() - for _, s := range srvs { - logger.Info("Shutting down server: ", s.name) - if err := s.srv.Shutdown(shutdownCtx); err != nil { - logger.Errorw("Failed to shutdown server", zap.String("server", s.name), zap.Error(err)) - } + shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownRelease() + for _, s := range srvs { + logger.Info("Shutting down server: ", s.name) + if err := s.srv.Shutdown(shutdownCtx); err != nil { + logger.Errorw("Failed to shutdown server", zap.String("server", s.name), zap.Error(err)) } - */ + } + logger.Info("Shutdown complete, exiting...") } return nil From 7534c4e56fd8659050ad6397eb0ebbcd96c0d70d Mon Sep 17 00:00:00 2001 From: David Hadas Date: Tue, 21 Feb 2023 19:51:15 +0200 Subject: [PATCH 05/15] Trying to wait longer at Shutdown Signed-off-by: David Hadas --- pkg/queue/sharedmain/main.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index 1e8b5df077cf..65d013bdad83 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -256,17 +256,18 @@ func Main(opts ...Option) error { // Enable TLS when certificate is mounted. tlsEnabled := exists(logger, certPath) && exists(logger, keyPath) - mainServer, drainer = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) if tlsEnabled { - var drainerTls *pkghandler.Drainer - mainServer, drainerTls = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) + mainServer, drainer = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) srvs = append(srvs, service{name: "tlsMain", srv: mainServer, tls: true}) - srvs = append(srvs, service{name: "tlsAdmin", srv: buildAdminServer(d.Ctx, logger, drainerTls), tls: true}) + srvs = append(srvs, service{name: "tlsAdmin", srv: buildAdminServer(d.Ctx, logger, drainer), tls: true}) } else { + mainServer, drainer = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) srvs = append(srvs, service{name: "main", srv: mainServer, tls: false}) + srvs = append(srvs, service{name: "admin", srv: buildAdminServer(d.Ctx, logger, drainer), tls: false}) } - srvs = append(srvs, service{name: "admin", srv: buildAdminServer(d.Ctx, logger, drainer), tls: false}) + srvs = append(srvs, service{name: "metrics", srv: buildMetricsServer(protoStatReporter), tls: false}) + if env.EnableProfiling { srvs = append(srvs, service{name: "profile", srv: profiling.NewServer(profiling.NewHandler(logger, true)), tls: false}) } @@ -306,7 +307,8 @@ func Main(opts ...Option) error { logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) drainer.Drain() - shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 10*time.Second) + // Shutdown() to all services, wait no more than 600 sec for graceful termination + shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 600*time.Second) defer shutdownRelease() for _, s := range srvs { logger.Info("Shutting down server: ", s.name) From 3db33491300d66c268addc0c520f4d88b952673c Mon Sep 17 00:00:00 2001 From: David Hadas Date: Thu, 23 Feb 2023 17:06:18 +0200 Subject: [PATCH 06/15] added main in tls mode, unified admin and adminTls, unified the drainers, moved to struct service being returned from build* functions Signed-off-by: David Hadas --- pkg/queue/sharedmain/main.go | 100 +++++++++++++++++++++++++---------- 1 file changed, 72 insertions(+), 28 deletions(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index 65d013bdad83..13e84c1ebfa3 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -24,6 +24,7 @@ import ( "net/http" "os" "strconv" + "sync" "time" "github.com/kelseyhightower/envconfig" @@ -164,6 +165,12 @@ type Defaults struct { Transport http.RoundTripper } +type service struct { + name string + srv *http.Server + tls bool +} + type Option func(*Defaults) func init() { @@ -243,33 +250,33 @@ func Main(opts ...Option) error { concurrencyendpoint = queue.NewConcurrencyEndpoint(env.ConcurrencyStateEndpoint, env.ConcurrencyStateTokenPath) } - type service struct { - name string - srv *http.Server - tls bool - } - var srvs []service var drainer *pkghandler.Drainer - var mainServer *http.Server + var drainerTLS *pkghandler.Drainer + var mainServer service // Enable TLS when certificate is mounted. tlsEnabled := exists(logger, certPath) && exists(logger, keyPath) + // At this moment activator with TLS does not disable HTTP. + // Start main service regardless if tlsEnabled is true + // See also https://github.com/knative/serving/issues/12808. + mainServer, drainer = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) + srvs = append(srvs, mainServer) + if tlsEnabled { - mainServer, drainer = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) - srvs = append(srvs, service{name: "tlsMain", srv: mainServer, tls: true}) - srvs = append(srvs, service{name: "tlsAdmin", srv: buildAdminServer(d.Ctx, logger, drainer), tls: true}) - } else { - mainServer, drainer = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) - srvs = append(srvs, service{name: "main", srv: mainServer, tls: false}) - srvs = append(srvs, service{name: "admin", srv: buildAdminServer(d.Ctx, logger, drainer), tls: false}) + // TODO: Unify the two drainers. Alternativly, Drain both (asynchrniously). + var mainServerTLS service + mainServerTLS, drainerTLS = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) + srvs = append(srvs, mainServerTLS) } - srvs = append(srvs, service{name: "metrics", srv: buildMetricsServer(protoStatReporter), tls: false}) + srvs = append(srvs, buildAdminServer(d.Ctx, logger, drainer, drainerTLS)) + + srvs = append(srvs, buildMetricsServer(protoStatReporter)) if env.EnableProfiling { - srvs = append(srvs, service{name: "profile", srv: profiling.NewServer(profiling.NewHandler(logger, true)), tls: false}) + srvs = append(srvs, buildProfilingServer(logger)) } logger.Info("Starting queue-proxy") @@ -305,7 +312,7 @@ func Main(opts ...Option) error { } logger.Info("Received TERM signal, attempting to gracefully shutdown servers.") logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) - drainer.Drain() + drain(drainer, drainerTLS) // Shutdown() to all services, wait no more than 600 sec for graceful termination shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 600*time.Second) @@ -342,7 +349,7 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2 } func buildServer(ctx context.Context, env config, transport http.RoundTripper, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger, - ce *queue.ConcurrencyEndpoint, enableTLS bool) (*http.Server, *pkghandler.Drainer) { + ce *queue.ConcurrencyEndpoint, enableTLS bool) (mainService service, drainer *pkghandler.Drainer) { // TODO: If TLS is enabled, execute probes twice and tracking two different sets of container health. target := net.JoinHostPort("127.0.0.1", env.UserPort) @@ -397,7 +404,7 @@ func buildServer(ctx context.Context, env config, transport http.RoundTripper, p composedHandler = tracing.HTTPSpanMiddleware(composedHandler) } - drainer := &pkghandler.Drainer{ + drainer = &pkghandler.Drainer{ QuietPeriod: drainSleepDuration, // Add Activator probe header to the drainer so it can handle probes directly from activator HealthCheckUAPrefixes: []string{netheader.ActivatorUserAgent, netheader.AutoscalingUserAgent}, @@ -412,11 +419,16 @@ func buildServer(ctx context.Context, env config, transport http.RoundTripper, p composedHandler = requestLogHandler(logger, composedHandler, env) } + mainService.tls = enableTLS if enableTLS { - return pkgnet.NewServer(":"+env.QueueServingTLSPort, composedHandler), drainer - } + mainService.name = `mainTls` + mainService.srv = pkgnet.NewServer(":"+env.QueueServingTLSPort, composedHandler) - return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler), drainer + } else { + mainService.name = `main` + mainService.srv = pkgnet.NewServer(":"+env.QueueServingPort, composedHandler) + } + return mainService, drainer } func buildTransport(env config) http.RoundTripper { @@ -471,7 +483,27 @@ func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config, return true } -func buildAdminServer(ctx context.Context, logger *zap.SugaredLogger, drainer *pkghandler.Drainer) *http.Server { +// drain the two drainers +func drain(drainer *pkghandler.Drainer, drainerTLS *pkghandler.Drainer) { + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + if drainer != nil { + drainer.Drain() + } + }() + go func() { + defer wg.Done() + if drainerTLS != nil { + drainerTLS.Drain() + } + }() + wg.Wait() +} + +func buildAdminServer(ctx context.Context, logger *zap.SugaredLogger, drainer *pkghandler.Drainer, drainerTLS *pkghandler.Drainer) (adminService service) { adminMux := http.NewServeMux() adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) { logger.Info("Attached drain handler from user-container", r) @@ -488,26 +520,38 @@ func buildAdminServer(ctx context.Context, logger *zap.SugaredLogger, drainer *p } }() - drainer.Drain() + drain(drainer, drainerTLS) w.WriteHeader(http.StatusOK) }) - - return &http.Server{ + adminService.tls = false + adminService.name = "admin" + adminService.srv = &http.Server{ Addr: ":" + strconv.Itoa(networking.QueueAdminPort), Handler: adminMux, ReadHeaderTimeout: time.Minute, //https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6 } + return } -func buildMetricsServer(protobufStatReporter *queue.ProtobufStatsReporter) *http.Server { +func buildMetricsServer(protobufStatReporter *queue.ProtobufStatsReporter) (metrixService service) { metricsMux := http.NewServeMux() metricsMux.Handle("/metrics", queue.NewStatsHandler(protobufStatReporter)) - return &http.Server{ + metrixService.name = "metrics" + metrixService.tls = false + metrixService.srv = &http.Server{ Addr: ":" + strconv.Itoa(networking.AutoscalingQueueMetricsPort), Handler: metricsMux, ReadHeaderTimeout: time.Minute, //https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6 } + return +} + +func buildProfilingServer(logger *zap.SugaredLogger) (profileingService service) { + profileingService.name = "profile" + profileingService.tls = false + profileingService.srv = profiling.NewServer(profiling.NewHandler(logger, true)) + return } func requestLogHandler(logger *zap.SugaredLogger, currentHandler http.Handler, env config) http.Handler { From 9a25aea7fa25038c35cbbc49f374b660fc14e3a3 Mon Sep 17 00:00:00 2001 From: David Hadas Date: Thu, 23 Feb 2023 17:41:43 +0200 Subject: [PATCH 07/15] nits Signed-off-by: David Hadas --- pkg/queue/sharedmain/main.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index 13e84c1ebfa3..d260c144b204 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -265,7 +265,6 @@ func Main(opts ...Option) error { srvs = append(srvs, mainServer) if tlsEnabled { - // TODO: Unify the two drainers. Alternativly, Drain both (asynchrniously). var mainServerTLS service mainServerTLS, drainerTLS = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) srvs = append(srvs, mainServerTLS) @@ -419,15 +418,17 @@ func buildServer(ctx context.Context, env config, transport http.RoundTripper, p composedHandler = requestLogHandler(logger, composedHandler, env) } - mainService.tls = enableTLS + var port string if enableTLS { mainService.name = `mainTls` - mainService.srv = pkgnet.NewServer(":"+env.QueueServingTLSPort, composedHandler) + port = env.QueueServingTLSPort } else { mainService.name = `main` - mainService.srv = pkgnet.NewServer(":"+env.QueueServingPort, composedHandler) + port = env.QueueServingPort } + mainService.srv = pkgnet.NewServer(":"+port, composedHandler) + mainService.tls = enableTLS return mainService, drainer } From 2ec4a90a49ff29afd9a07d9970243de818d0b6c7 Mon Sep 17 00:00:00 2001 From: David Hadas Date: Thu, 23 Feb 2023 18:29:14 +0200 Subject: [PATCH 08/15] nits Signed-off-by: David Hadas --- pkg/queue/sharedmain/main.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index d260c144b204..1720533ff5a9 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -263,27 +263,22 @@ func Main(opts ...Option) error { // See also https://github.com/knative/serving/issues/12808. mainServer, drainer = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) srvs = append(srvs, mainServer) - if tlsEnabled { var mainServerTLS service mainServerTLS, drainerTLS = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) srvs = append(srvs, mainServerTLS) } - srvs = append(srvs, buildAdminServer(d.Ctx, logger, drainer, drainerTLS)) - srvs = append(srvs, buildMetricsServer(protoStatReporter)) - if env.EnableProfiling { srvs = append(srvs, buildProfilingServer(logger)) } - logger.Info("Starting queue-proxy") errCh := make(chan error) for _, s := range srvs { go func(s service) { - logger.Debugf("Starting service %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) + logger.Debugf("Starting sever service %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) // Don't forward ErrServerClosed as that indicates we're already shutting down. var err error if s.tls { @@ -294,7 +289,7 @@ func Main(opts ...Option) error { if err != nil && !errors.Is(err, http.ErrServerClosed) { errCh <- fmt.Errorf("%s server failed to serve: %w", s.name, err) } - logger.Debugf("Ended service %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) + logger.Debugf("Ended sever service %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) }(s) } @@ -323,7 +318,7 @@ func Main(opts ...Option) error { } } - logger.Info("Shutdown complete, exiting...") + logger.Info("Shutdown of all servers is now complete") } return nil } @@ -422,7 +417,6 @@ func buildServer(ctx context.Context, env config, transport http.RoundTripper, p if enableTLS { mainService.name = `mainTls` port = env.QueueServingTLSPort - } else { mainService.name = `main` port = env.QueueServingPort From 1e568c783cfe79bacc541c959649bc51991ab259 Mon Sep 17 00:00:00 2001 From: David Hadas Date: Thu, 2 Mar 2023 14:37:11 +0200 Subject: [PATCH 09/15] nits Signed-off-by: David Hadas --- pkg/queue/sharedmain/main.go | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index 1720533ff5a9..daa3229c5b84 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -80,6 +80,9 @@ const ( // QPOptionTokenDirPath is a directory for per audience tokens // This path is used by QP Options (Extensions) as / QPOptionTokenDirPath = queue.TokenDirectory + + // ShutdownLimit is the maximum time we wait for a server to shutdown after a Kill + ShutdownLimit = 600 * time.Second ) type config struct { @@ -251,9 +254,7 @@ func Main(opts ...Option) error { } var srvs []service - var drainer *pkghandler.Drainer var drainerTLS *pkghandler.Drainer - var mainServer service // Enable TLS when certificate is mounted. tlsEnabled := exists(logger, certPath) && exists(logger, keyPath) @@ -261,7 +262,7 @@ func Main(opts ...Option) error { // At this moment activator with TLS does not disable HTTP. // Start main service regardless if tlsEnabled is true // See also https://github.com/knative/serving/issues/12808. - mainServer, drainer = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) + mainServer, drainer := buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) srvs = append(srvs, mainServer) if tlsEnabled { var mainServerTLS service @@ -308,8 +309,8 @@ func Main(opts ...Option) error { logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) drain(drainer, drainerTLS) - // Shutdown() to all services, wait no more than 600 sec for graceful termination - shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 600*time.Second) + // Shutdown() to all services, wait no more than ShutdownLimit for graceful termination + shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), ShutdownLimit) defer shutdownRelease() for _, s := range srvs { logger.Info("Shutting down server: ", s.name) @@ -343,7 +344,7 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2 } func buildServer(ctx context.Context, env config, transport http.RoundTripper, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger, - ce *queue.ConcurrencyEndpoint, enableTLS bool) (mainService service, drainer *pkghandler.Drainer) { + ce *queue.ConcurrencyEndpoint, enableTLS bool) (server service, drainer *pkghandler.Drainer) { // TODO: If TLS is enabled, execute probes twice and tracking two different sets of container health. target := net.JoinHostPort("127.0.0.1", env.UserPort) @@ -415,15 +416,15 @@ func buildServer(ctx context.Context, env config, transport http.RoundTripper, p var port string if enableTLS { - mainService.name = `mainTls` + server.name = `mainTls` port = env.QueueServingTLSPort } else { - mainService.name = `main` + server.name = `main` port = env.QueueServingPort } - mainService.srv = pkgnet.NewServer(":"+port, composedHandler) - mainService.tls = enableTLS - return mainService, drainer + server.srv = pkgnet.NewServer(":"+port, composedHandler) + server.tls = enableTLS + return server, drainer } func buildTransport(env config) http.RoundTripper { @@ -542,10 +543,10 @@ func buildMetricsServer(protobufStatReporter *queue.ProtobufStatsReporter) (metr return } -func buildProfilingServer(logger *zap.SugaredLogger) (profileingService service) { - profileingService.name = "profile" - profileingService.tls = false - profileingService.srv = profiling.NewServer(profiling.NewHandler(logger, true)) +func buildProfilingServer(logger *zap.SugaredLogger) (profilingService service) { + profilingService.name = "profile" + profilingService.tls = false + profilingService.srv = profiling.NewServer(profiling.NewHandler(logger, true)) return } From 77e9a111b0d3097157ec8af5ae8bb632e22d25ac Mon Sep 17 00:00:00 2001 From: David Hadas Date: Thu, 2 Mar 2023 17:22:20 +0200 Subject: [PATCH 10/15] nits Signed-off-by: David Hadas --- pkg/queue/sharedmain/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index daa3229c5b84..d7304450c436 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -416,10 +416,10 @@ func buildServer(ctx context.Context, env config, transport http.RoundTripper, p var port string if enableTLS { - server.name = `mainTls` + server.name = "mainTls" port = env.QueueServingTLSPort } else { - server.name = `main` + server.name = "main" port = env.QueueServingPort } server.srv = pkgnet.NewServer(":"+port, composedHandler) From c3a51d25799cbf1d5c3c727c8b0187b146c9e778 Mon Sep 17 00:00:00 2001 From: David Hadas Date: Fri, 3 Mar 2023 16:21:24 +0200 Subject: [PATCH 11/15] opt for lower shotdown limit + nits --- pkg/queue/sharedmain/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index d7304450c436..d095a9067f67 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -82,7 +82,7 @@ const ( QPOptionTokenDirPath = queue.TokenDirectory // ShutdownLimit is the maximum time we wait for a server to shutdown after a Kill - ShutdownLimit = 600 * time.Second + ShutdownLimit = 5 * time.Second ) type config struct { @@ -257,14 +257,14 @@ func Main(opts ...Option) error { var drainerTLS *pkghandler.Drainer // Enable TLS when certificate is mounted. - tlsEnabled := exists(logger, certPath) && exists(logger, keyPath) + encryptionEnabled := exists(logger, certPath) && exists(logger, keyPath) // At this moment activator with TLS does not disable HTTP. // Start main service regardless if tlsEnabled is true // See also https://github.com/knative/serving/issues/12808. mainServer, drainer := buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) srvs = append(srvs, mainServer) - if tlsEnabled { + if encryptionEnabled { var mainServerTLS service mainServerTLS, drainerTLS = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) srvs = append(srvs, mainServerTLS) From b2653ac2e8b62615e40204649a675a6e22c16c9f Mon Sep 17 00:00:00 2001 From: David Hadas Date: Wed, 8 Mar 2023 07:24:36 +0200 Subject: [PATCH 12/15] align naming Signed-off-by: David Hadas --- pkg/queue/sharedmain/main.go | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index d095a9067f67..5d2684e26f1c 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -262,17 +262,17 @@ func Main(opts ...Option) error { // At this moment activator with TLS does not disable HTTP. // Start main service regardless if tlsEnabled is true // See also https://github.com/knative/serving/issues/12808. - mainServer, drainer := buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) - srvs = append(srvs, mainServer) + mainService, drainer := buildMainService(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) + srvs = append(srvs, mainService) if encryptionEnabled { - var mainServerTLS service - mainServerTLS, drainerTLS = buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) - srvs = append(srvs, mainServerTLS) + var mainServiceTLS service + mainServiceTLS, drainerTLS = buildMainService(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) + srvs = append(srvs, mainServiceTLS) } - srvs = append(srvs, buildAdminServer(d.Ctx, logger, drainer, drainerTLS)) - srvs = append(srvs, buildMetricsServer(protoStatReporter)) + srvs = append(srvs, buildAdminService(d.Ctx, logger, drainer, drainerTLS)) + srvs = append(srvs, buildMetricsService(protoStatReporter)) if env.EnableProfiling { - srvs = append(srvs, buildProfilingServer(logger)) + srvs = append(srvs, buildProfilingService(logger)) } logger.Info("Starting queue-proxy") @@ -343,8 +343,8 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2 return readiness.NewProbe(coreProbe) } -func buildServer(ctx context.Context, env config, transport http.RoundTripper, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger, - ce *queue.ConcurrencyEndpoint, enableTLS bool) (server service, drainer *pkghandler.Drainer) { +func buildMainService(ctx context.Context, env config, transport http.RoundTripper, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger, + ce *queue.ConcurrencyEndpoint, enableTLS bool) (mainService service, drainer *pkghandler.Drainer) { // TODO: If TLS is enabled, execute probes twice and tracking two different sets of container health. target := net.JoinHostPort("127.0.0.1", env.UserPort) @@ -416,15 +416,15 @@ func buildServer(ctx context.Context, env config, transport http.RoundTripper, p var port string if enableTLS { - server.name = "mainTls" + mainService.name = "mainTls" port = env.QueueServingTLSPort } else { - server.name = "main" + mainService.name = "main" port = env.QueueServingPort } - server.srv = pkgnet.NewServer(":"+port, composedHandler) - server.tls = enableTLS - return server, drainer + mainService.srv = pkgnet.NewServer(":"+port, composedHandler) + mainService.tls = enableTLS + return mainService, drainer } func buildTransport(env config) http.RoundTripper { @@ -499,7 +499,7 @@ func drain(drainer *pkghandler.Drainer, drainerTLS *pkghandler.Drainer) { wg.Wait() } -func buildAdminServer(ctx context.Context, logger *zap.SugaredLogger, drainer *pkghandler.Drainer, drainerTLS *pkghandler.Drainer) (adminService service) { +func buildAdminService(ctx context.Context, logger *zap.SugaredLogger, drainer *pkghandler.Drainer, drainerTLS *pkghandler.Drainer) (adminService service) { adminMux := http.NewServeMux() adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) { logger.Info("Attached drain handler from user-container", r) @@ -529,7 +529,7 @@ func buildAdminServer(ctx context.Context, logger *zap.SugaredLogger, drainer *p return } -func buildMetricsServer(protobufStatReporter *queue.ProtobufStatsReporter) (metrixService service) { +func buildMetricsService(protobufStatReporter *queue.ProtobufStatsReporter) (metrixService service) { metricsMux := http.NewServeMux() metricsMux.Handle("/metrics", queue.NewStatsHandler(protobufStatReporter)) @@ -543,7 +543,7 @@ func buildMetricsServer(protobufStatReporter *queue.ProtobufStatsReporter) (metr return } -func buildProfilingServer(logger *zap.SugaredLogger) (profilingService service) { +func buildProfilingService(logger *zap.SugaredLogger) (profilingService service) { profilingService.name = "profile" profilingService.tls = false profilingService.srv = profiling.NewServer(profiling.NewHandler(logger, true)) From 7c7825f2da88557f2180a1e1b75fc7ad7f92cac1 Mon Sep 17 00:00:00 2001 From: David Hadas Date: Thu, 16 Mar 2023 13:49:48 +0200 Subject: [PATCH 13/15] changes based on review comments + admin to use TLS (when used) --- pkg/queue/sharedmain/main.go | 105 +++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 49 deletions(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index 5d2684e26f1c..5762790123b5 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -81,8 +81,8 @@ const ( // This path is used by QP Options (Extensions) as / QPOptionTokenDirPath = queue.TokenDirectory - // ShutdownLimit is the maximum time we wait for a server to shutdown after a Kill - ShutdownLimit = 5 * time.Second + // shutdownLimit is the maximum time we wait for a server to shutdown after a Kill + shutdownLimit = 5 * time.Second ) type config struct { @@ -168,7 +168,7 @@ type Defaults struct { Transport http.RoundTripper } -type service struct { +type httpServer struct { name string srv *http.Server tls bool @@ -253,33 +253,33 @@ func Main(opts ...Option) error { concurrencyendpoint = queue.NewConcurrencyEndpoint(env.ConcurrencyStateEndpoint, env.ConcurrencyStateTokenPath) } - var srvs []service + var srvs []httpServer var drainerTLS *pkghandler.Drainer // Enable TLS when certificate is mounted. encryptionEnabled := exists(logger, certPath) && exists(logger, keyPath) // At this moment activator with TLS does not disable HTTP. - // Start main service regardless if tlsEnabled is true + // Start main HttpServer regardless if tlsEnabled is true // See also https://github.com/knative/serving/issues/12808. - mainService, drainer := buildMainService(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) - srvs = append(srvs, mainService) + mainHttpServer, drainer := buildMainHttpServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) + srvs = append(srvs, mainHttpServer) if encryptionEnabled { - var mainServiceTLS service - mainServiceTLS, drainerTLS = buildMainService(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) - srvs = append(srvs, mainServiceTLS) + var mainHttpServerTLS httpServer + mainHttpServerTLS, drainerTLS = buildMainHttpServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) + srvs = append(srvs, mainHttpServerTLS) } - srvs = append(srvs, buildAdminService(d.Ctx, logger, drainer, drainerTLS)) - srvs = append(srvs, buildMetricsService(protoStatReporter)) + srvs = append(srvs, buildAdminHttpServer(d.Ctx, logger, drainer, drainerTLS, encryptionEnabled)) + srvs = append(srvs, buildMetricsHttpServer(protoStatReporter)) if env.EnableProfiling { - srvs = append(srvs, buildProfilingService(logger)) + srvs = append(srvs, buildProfilingHttpServer(logger)) } logger.Info("Starting queue-proxy") errCh := make(chan error) for _, s := range srvs { - go func(s service) { - logger.Debugf("Starting sever service %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) + go func(s httpServer) { + logger.Debugf("Starting HttpServer %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) // Don't forward ErrServerClosed as that indicates we're already shutting down. var err error if s.tls { @@ -290,7 +290,7 @@ func Main(opts ...Option) error { if err != nil && !errors.Is(err, http.ErrServerClosed) { errCh <- fmt.Errorf("%s server failed to serve: %w", s.name, err) } - logger.Debugf("Ended sever service %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) + logger.Debugf("Ended HttpServer %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) }(s) } @@ -309,8 +309,8 @@ func Main(opts ...Option) error { logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) drain(drainer, drainerTLS) - // Shutdown() to all services, wait no more than ShutdownLimit for graceful termination - shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), ShutdownLimit) + // Shutdown() to all HttpServers, wait no more than shutdownLimit for graceful termination + shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), shutdownLimit) defer shutdownRelease() for _, s := range srvs { logger.Info("Shutting down server: ", s.name) @@ -343,8 +343,8 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2 return readiness.NewProbe(coreProbe) } -func buildMainService(ctx context.Context, env config, transport http.RoundTripper, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger, - ce *queue.ConcurrencyEndpoint, enableTLS bool) (mainService service, drainer *pkghandler.Drainer) { +func buildMainHttpServer(ctx context.Context, env config, transport http.RoundTripper, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger, + ce *queue.ConcurrencyEndpoint, enableTLS bool) (httpServer, *pkghandler.Drainer) { // TODO: If TLS is enabled, execute probes twice and tracking two different sets of container health. target := net.JoinHostPort("127.0.0.1", env.UserPort) @@ -399,7 +399,7 @@ func buildMainService(ctx context.Context, env config, transport http.RoundTripp composedHandler = tracing.HTTPSpanMiddleware(composedHandler) } - drainer = &pkghandler.Drainer{ + drainer := &pkghandler.Drainer{ QuietPeriod: drainSleepDuration, // Add Activator probe header to the drainer so it can handle probes directly from activator HealthCheckUAPrefixes: []string{netheader.ActivatorUserAgent, netheader.AutoscalingUserAgent}, @@ -414,17 +414,20 @@ func buildMainService(ctx context.Context, env config, transport http.RoundTripp composedHandler = requestLogHandler(logger, composedHandler, env) } - var port string + var port, name string if enableTLS { - mainService.name = "mainTls" + name = "mainTls" port = env.QueueServingTLSPort } else { - mainService.name = "main" + name = "main" port = env.QueueServingPort } - mainService.srv = pkgnet.NewServer(":"+port, composedHandler) - mainService.tls = enableTLS - return mainService, drainer + + return httpServer{ + name: name, + tls: enableTLS, + srv: pkgnet.NewServer(":"+port, composedHandler), + }, drainer } func buildTransport(env config) http.RoundTripper { @@ -499,7 +502,7 @@ func drain(drainer *pkghandler.Drainer, drainerTLS *pkghandler.Drainer) { wg.Wait() } -func buildAdminService(ctx context.Context, logger *zap.SugaredLogger, drainer *pkghandler.Drainer, drainerTLS *pkghandler.Drainer) (adminService service) { +func buildAdminHttpServer(ctx context.Context, logger *zap.SugaredLogger, drainer *pkghandler.Drainer, drainerTLS *pkghandler.Drainer, enableTLS bool) httpServer { adminMux := http.NewServeMux() adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) { logger.Info("Attached drain handler from user-container", r) @@ -519,35 +522,39 @@ func buildAdminService(ctx context.Context, logger *zap.SugaredLogger, drainer * drain(drainer, drainerTLS) w.WriteHeader(http.StatusOK) }) - adminService.tls = false - adminService.name = "admin" - adminService.srv = &http.Server{ - Addr: ":" + strconv.Itoa(networking.QueueAdminPort), - Handler: adminMux, - ReadHeaderTimeout: time.Minute, //https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6 - } - return + + return httpServer{ + name: "admin", + tls: enableTls, + srv: &http.Server{ + Addr: ":" + strconv.Itoa(networking.QueueAdminPort), + Handler: adminMux, + ReadHeaderTimeout: time.Minute, //https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6 + }, + } } -func buildMetricsService(protobufStatReporter *queue.ProtobufStatsReporter) (metrixService service) { +func buildMetricsHttpServer(protobufStatReporter *queue.ProtobufStatsReporter) httpServer { metricsMux := http.NewServeMux() metricsMux.Handle("/metrics", queue.NewStatsHandler(protobufStatReporter)) - metrixService.name = "metrics" - metrixService.tls = false - metrixService.srv = &http.Server{ - Addr: ":" + strconv.Itoa(networking.AutoscalingQueueMetricsPort), - Handler: metricsMux, - ReadHeaderTimeout: time.Minute, //https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6 + return httpServer{ + name: "metrics", + tls: false, + srv: &http.Server{ + Addr: ":" + strconv.Itoa(networking.AutoscalingQueueMetricsPort), + Handler: metricsMux, + ReadHeaderTimeout: time.Minute, //https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6 + }, } - return } -func buildProfilingService(logger *zap.SugaredLogger) (profilingService service) { - profilingService.name = "profile" - profilingService.tls = false - profilingService.srv = profiling.NewServer(profiling.NewHandler(logger, true)) - return +func buildProfilingHttpServer(logger *zap.SugaredLogger) httpServer { + return httpServer{ + name: "profile", + tls: false, + srv: profiling.NewServer(profiling.NewHandler(logger, true)), + } } func requestLogHandler(logger *zap.SugaredLogger, currentHandler http.Handler, env config) http.Handler { From 3a6bf3940e9802559509d883d4c059151dcaa13b Mon Sep 17 00:00:00 2001 From: David Hadas Date: Thu, 16 Mar 2023 14:02:23 +0200 Subject: [PATCH 14/15] changes based on review comments + admin to use TLS (when used) --- pkg/queue/sharedmain/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index 5762790123b5..c8c5d3d6adf5 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -265,6 +265,7 @@ func Main(opts ...Option) error { mainHttpServer, drainer := buildMainHttpServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) srvs = append(srvs, mainHttpServer) if encryptionEnabled { + // add mainTls on top of the main httpServer var mainHttpServerTLS httpServer mainHttpServerTLS, drainerTLS = buildMainHttpServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) srvs = append(srvs, mainHttpServerTLS) @@ -525,7 +526,7 @@ func buildAdminHttpServer(ctx context.Context, logger *zap.SugaredLogger, draine return httpServer{ name: "admin", - tls: enableTls, + tls: enableTLS, srv: &http.Server{ Addr: ":" + strconv.Itoa(networking.QueueAdminPort), Handler: adminMux, From 9f070f6ade0853c4c8347f97949b0507b3602c25 Mon Sep 17 00:00:00 2001 From: David Hadas Date: Thu, 16 Mar 2023 14:11:24 +0200 Subject: [PATCH 15/15] changes based on review comments + admin to use TLS (when used) --- pkg/queue/sharedmain/main.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index c8c5d3d6adf5..99c194f137cd 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -260,27 +260,27 @@ func Main(opts ...Option) error { encryptionEnabled := exists(logger, certPath) && exists(logger, keyPath) // At this moment activator with TLS does not disable HTTP. - // Start main HttpServer regardless if tlsEnabled is true + // Start main httpServer regardless if tlsEnabled is true // See also https://github.com/knative/serving/issues/12808. - mainHttpServer, drainer := buildMainHttpServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) - srvs = append(srvs, mainHttpServer) + mainHTTPServer, drainer := buildMainHTTPServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) + srvs = append(srvs, mainHTTPServer) if encryptionEnabled { // add mainTls on top of the main httpServer - var mainHttpServerTLS httpServer - mainHttpServerTLS, drainerTLS = buildMainHttpServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) - srvs = append(srvs, mainHttpServerTLS) + var mainHTTPServerTLS httpServer + mainHTTPServerTLS, drainerTLS = buildMainHTTPServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true) + srvs = append(srvs, mainHTTPServerTLS) } - srvs = append(srvs, buildAdminHttpServer(d.Ctx, logger, drainer, drainerTLS, encryptionEnabled)) - srvs = append(srvs, buildMetricsHttpServer(protoStatReporter)) + srvs = append(srvs, buildAdminHTTPServer(d.Ctx, logger, drainer, drainerTLS, encryptionEnabled)) + srvs = append(srvs, buildMetricsHTTPServer(protoStatReporter)) if env.EnableProfiling { - srvs = append(srvs, buildProfilingHttpServer(logger)) + srvs = append(srvs, buildProfilingHTTPServer(logger)) } logger.Info("Starting queue-proxy") errCh := make(chan error) for _, s := range srvs { go func(s httpServer) { - logger.Debugf("Starting HttpServer %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) + logger.Debugf("Starting httpServer %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) // Don't forward ErrServerClosed as that indicates we're already shutting down. var err error if s.tls { @@ -291,7 +291,7 @@ func Main(opts ...Option) error { if err != nil && !errors.Is(err, http.ErrServerClosed) { errCh <- fmt.Errorf("%s server failed to serve: %w", s.name, err) } - logger.Debugf("Ended HttpServer %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) + logger.Debugf("Ended httpServer %s on port %s with tls %t", s.name, s.srv.Addr, s.tls) }(s) } @@ -310,7 +310,7 @@ func Main(opts ...Option) error { logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) drain(drainer, drainerTLS) - // Shutdown() to all HttpServers, wait no more than shutdownLimit for graceful termination + // Shutdown() to all httpServers, wait no more than shutdownLimit for graceful termination shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), shutdownLimit) defer shutdownRelease() for _, s := range srvs { @@ -344,7 +344,7 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2 return readiness.NewProbe(coreProbe) } -func buildMainHttpServer(ctx context.Context, env config, transport http.RoundTripper, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger, +func buildMainHTTPServer(ctx context.Context, env config, transport http.RoundTripper, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger, ce *queue.ConcurrencyEndpoint, enableTLS bool) (httpServer, *pkghandler.Drainer) { // TODO: If TLS is enabled, execute probes twice and tracking two different sets of container health. @@ -503,7 +503,7 @@ func drain(drainer *pkghandler.Drainer, drainerTLS *pkghandler.Drainer) { wg.Wait() } -func buildAdminHttpServer(ctx context.Context, logger *zap.SugaredLogger, drainer *pkghandler.Drainer, drainerTLS *pkghandler.Drainer, enableTLS bool) httpServer { +func buildAdminHTTPServer(ctx context.Context, logger *zap.SugaredLogger, drainer *pkghandler.Drainer, drainerTLS *pkghandler.Drainer, enableTLS bool) httpServer { adminMux := http.NewServeMux() adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) { logger.Info("Attached drain handler from user-container", r) @@ -535,7 +535,7 @@ func buildAdminHttpServer(ctx context.Context, logger *zap.SugaredLogger, draine } } -func buildMetricsHttpServer(protobufStatReporter *queue.ProtobufStatsReporter) httpServer { +func buildMetricsHTTPServer(protobufStatReporter *queue.ProtobufStatsReporter) httpServer { metricsMux := http.NewServeMux() metricsMux.Handle("/metrics", queue.NewStatsHandler(protobufStatReporter)) @@ -550,7 +550,7 @@ func buildMetricsHttpServer(protobufStatReporter *queue.ProtobufStatsReporter) h } } -func buildProfilingHttpServer(logger *zap.SugaredLogger) httpServer { +func buildProfilingHTTPServer(logger *zap.SugaredLogger) httpServer { return httpServer{ name: "profile", tls: false,