Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added cmd/activator/.main.go.swp
Binary file not shown.
33 changes: 32 additions & 1 deletion cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"log"
Expand Down Expand Up @@ -153,6 +154,36 @@ func main() {
logger.Fatalw("Failed to construct network config", zap.Error(err))
}

// Enable TLS against queue-proxy when the CA and SA are specified.
tlsEnabled := networkConfig.QueueProxyCA != "" && networkConfig.QueueProxySAN != ""

// Enable TLS client when queue-proxy-ca is specified.
// At this moment activator with TLS does not disable HTTP.
// See also https://github.com/knative/serving/issues/12808.
if tlsEnabled {
caSecret, err := kubeClient.CoreV1().Secrets(system.Namespace()).Get(ctx, networkConfig.QueueProxyCA, metav1.GetOptions{})
if err != nil {
logger.Fatalw("Failed to get secret", zap.Error(err))
}

pool, err := x509.SystemCertPool()
if err != nil {
pool = x509.NewCertPool()
}

if ok := pool.AppendCertsFromPEM(caSecret.Data["ca.crt"]); !ok {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the secret exists but doesn't have the right data, this will fail.

Do we need to undo the base64 encoding here before AppendCertsFromPEM? (I don't recall what the client-go libraries assist with here.)

logger.Fatalw("Failed to append ca cert to the RootCAs")
}

tlsConf := &tls.Config{
RootCAs: pool,
InsecureSkipVerify: false,
ServerName: networkConfig.QueueProxySAN,
MinVersion: tls.VersionTLS12,
}
transport = pkgnet.NewProxyAutoTLSTransport(env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost, tlsConf)
}

// Start throttler.
throttler := activatornet.NewThrottler(ctx, env.PodIP)
go throttler.Run(ctx, transport, networkConfig.EnableMeshPodAddressability, networkConfig.MeshCompatibilityMode)
Expand Down Expand Up @@ -188,7 +219,7 @@ func main() {

// Create activation handler chain
// Note: innermost handlers are specified first, ie. the last handler in the chain will be executed first
ah := activatorhandler.New(ctx, throttler, transport, networkConfig.EnableMeshPodAddressability, logger)
ah := activatorhandler.New(ctx, throttler, transport, networkConfig.EnableMeshPodAddressability, logger, tlsEnabled)
ah = concurrencyReporter.Handler(ah)
ah = activatorhandler.NewTracingHandler(ah)
reqLogHandler, err := pkghttp.NewRequestLogHandler(ah, logging.NewSyncFileWriter(os.Stdout), "",
Expand Down
75 changes: 63 additions & 12 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,18 @@ const (
// This is to give networking a little bit more time to remove the pod
// from its configuration and propagate that to all loadbalancers and nodes.
drainSleepDuration = 30 * time.Second

// certPath is the path for the server certificate mounted by queue-proxy.
certPath = queue.CertDirectory + "/tls.crt"

// keyPath is the path for the server certificate key mounted by queue-proxy.
keyPath = queue.CertDirectory + "/tls.key"
)

type config struct {
ContainerConcurrency int `split_words:"true" required:"true"`
QueueServingPort string `split_words:"true" required:"true"`
QueueServingTLSPort string `split_words:"true" required:"true"`
UserPort string `split_words:"true" required:"true"`
RevisionTimeoutSeconds int `split_words:"true" required:"true"`
MaxDurationSeconds int `split_words:"true"` // optional
Expand Down Expand Up @@ -162,25 +169,52 @@ func main() {
if env.ConcurrencyStateEndpoint != "" {
concurrencyendpoint = queue.NewConcurrencyEndpoint(env.ConcurrencyStateEndpoint, env.ConcurrencyStateTokenPath)
}
mainServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint)
servers := map[string]*http.Server{

// Enable TLS when certificate is mounted.
tlsEnabled := exists(logger, certPath) && exists(logger, keyPath)

mainServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, false)
httpServers := map[string]*http.Server{
"main": mainServer,
"admin": buildAdminServer(logger, drain),
"metrics": buildMetricsServer(promStatReporter, protoStatReporter),
"admin": buildAdminServer(logger, drain),
}
if env.EnableProfiling {
servers["profile"] = profiling.NewServer(profiling.NewHandler(logger, true))
httpServers["profile"] = profiling.NewServer(profiling.NewHandler(logger, true))
}

// Enable TLS server when activator server certs are mounted.
// At this moment activator with TLS does not disable HTTP.
// See also https://github.com/knative/serving/issues/12808.
var tlsServers map[string]*http.Server
if tlsEnabled {
mainTLSServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, true /* enable TLS */)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the drain from line 176 is lost?

Again, okay for pre-alpha, but it would be nice to unify the drains post 1.4

tlsServers = map[string]*http.Server{
"tlsMain": mainTLSServer,
"tlsAdmin": buildAdminServer(logger, drain),
}
// Drop admin http server as we Use TLS for the admin server.
// TODO: The drain created with mainServer above is lost. Unify the two drain.
delete(httpServers, "admin")
}

errCh := make(chan error)
for name, server := range servers {
for name, server := range httpServers {
go func(name string, s *http.Server) {
// Don't forward ErrServerClosed as that indicates we're already shutting down.
if err := s.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- fmt.Errorf("%s server failed to serve: %w", name, err)
}
}(name, server)
}
for name, server := range tlsServers {
go func(name string, s *http.Server) {
// Don't forward ErrServerClosed as that indicates we're already shutting down.
if err := s.ListenAndServeTLS(certPath, keyPath); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- fmt.Errorf("%s server failed to serve: %w", name, err)
}
}(name, server)
}

// Blocks until we actually receive a TERM signal or one of the servers
// exits unexpectedly. We fold both signals together because we only want
Expand All @@ -200,9 +234,9 @@ func main() {
drain()

// Removing the main server from the shutdown logic as we've already shut it down.
delete(servers, "main")
delete(httpServers, "main")

for serverName, srv := range servers {
for serverName, srv := range httpServers {
logger.Info("Shutting down server: ", serverName)
if err := srv.Shutdown(context.Background()); err != nil {
logger.Errorw("Failed to shutdown server", zap.String("server", serverName), zap.Error(err))
Expand All @@ -212,6 +246,14 @@ func main() {
}
}

func exists(logger *zap.SugaredLogger, filename string) bool {
_, err := os.Stat(filename)
if err != nil && !os.IsNotExist(err) {
logger.Fatalw(fmt.Sprintf("Failed to verify the file path %q", filename), zap.Error(err))
}
return err == nil
}

func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2 bool) *readiness.Probe {
coreProbe, err := readiness.DecodeProbe(encodedProbe)
if err != nil {
Expand All @@ -224,18 +266,20 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2
}

func buildServer(ctx context.Context, env config, probeContainer func() bool, stats *network.RequestStats, logger *zap.SugaredLogger,
ce *queue.ConcurrencyEndpoint) (server *http.Server, drain func()) {
ce *queue.ConcurrencyEndpoint, enableTLS bool) (server *http.Server, drain func()) {
Comment thread
nak3 marked this conversation as resolved.
// TODO: If TLS is enabled, execute probes twice and tracking two different sets of container health.

target := net.JoinHostPort("127.0.0.1", env.UserPort)

httpProxy := pkghttp.NewHeaderPruningReverseProxy(target, pkghttp.NoHostOverride, activator.RevisionHeaders)
httpProxy := pkghttp.NewHeaderPruningReverseProxy(target, pkghttp.NoHostOverride, activator.RevisionHeaders, false /* use HTTP */)
httpProxy.Transport = buildTransport(env, logger)
httpProxy.ErrorHandler = pkghandler.Error(logger)
httpProxy.BufferPool = network.NewBufferPool()
httpProxy.FlushInterval = network.FlushInterval

// TODO: During HTTP and HTTPS transition, counting concurrency could not be accurate. Count accurately.
breaker := buildBreaker(logger, env)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we won't be counting concurrency properly across HTTP and HTTPS while the activator change rolls out (but again, a post-1.4 note, just worth adding a TODO here).

metricsSupported := supportsMetrics(ctx, logger, env)
metricsSupported := supportsMetrics(ctx, logger, env, enableTLS)
tracingEnabled := env.TracingConfigBackend != tracingconfig.None
concurrencyStateEnabled := env.ConcurrencyStateEndpoint != ""
firstByteTimeout := time.Duration(env.RevisionTimeoutSeconds) * time.Second
Expand Down Expand Up @@ -287,6 +331,10 @@ func buildServer(ctx context.Context, env config, probeContainer func() bool, st
composedHandler = requestLogHandler(logger, composedHandler, env)
}

if enableTLS {
return pkgnet.NewServer(":"+env.QueueServingTLSPort, composedHandler), drainer.Drain
}

return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler), drainer.Drain
}

Expand Down Expand Up @@ -333,12 +381,15 @@ func buildBreaker(logger *zap.SugaredLogger, env config) *queue.Breaker {
return queue.NewBreaker(params)
}

func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config) bool {
func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config, enableTLS bool) bool {
// Keep it on HTTP because Metrics needs to be registered on either TLS server or non-TLS server.
if enableTLS {
return false
}
// Setup request metrics reporting for end-user metrics.
if env.ServingRequestMetricsBackend == "" {
return false
}

if err := setupMetricsExporter(ctx, logger, env.ServingRequestMetricsBackend, env.MetricsCollectorAddress); err != nil {
logger.Errorw("Error setting up request metrics exporter. Request metrics will be unavailable.", zap.Error(err))
return false
Expand Down
24 changes: 22 additions & 2 deletions pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"errors"
"net/http"
"net/http/httputil"
"strconv"
"strings"

"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/trace"
Expand All @@ -35,6 +37,7 @@ import (
"knative.dev/serving/pkg/activator"
activatorconfig "knative.dev/serving/pkg/activator/config"
pkghttp "knative.dev/serving/pkg/http"
"knative.dev/serving/pkg/networking"
"knative.dev/serving/pkg/queue"
"knative.dev/serving/pkg/reconciler/serverlessservice/resources/names"
)
Expand All @@ -53,10 +56,11 @@ type activationHandler struct {
throttler Throttler
bufferPool httputil.BufferPool
logger *zap.SugaredLogger
tls bool
}

// New constructs a new http.Handler that deals with revision activation.
func New(_ context.Context, t Throttler, transport http.RoundTripper, usePassthroughLb bool, logger *zap.SugaredLogger) http.Handler {
func New(_ context.Context, t Throttler, transport http.RoundTripper, usePassthroughLb bool, logger *zap.SugaredLogger, tlsEnabled bool) http.Handler {
return &activationHandler{
transport: transport,
tracingTransport: &ochttp.Transport{
Expand All @@ -67,6 +71,7 @@ func New(_ context.Context, t Throttler, transport http.RoundTripper, usePassthr
throttler: t,
bufferPool: network.NewBufferPool(),
logger: logger,
tls: tlsEnabled,
}
}

Expand Down Expand Up @@ -116,7 +121,14 @@ func (a *activationHandler) proxyRequest(revID types.NamespacedName, w http.Resp
if usePassthroughLb {
hostOverride = names.PrivateService(revID.Name) + "." + revID.Namespace
}
proxy := pkghttp.NewHeaderPruningReverseProxy(target, hostOverride, activator.RevisionHeaders)

var proxy *httputil.ReverseProxy
if a.tls {
proxy = pkghttp.NewHeaderPruningReverseProxy(useSecurePort(target), hostOverride, activator.RevisionHeaders, true /* uss HTTPS */)
} else {
proxy = pkghttp.NewHeaderPruningReverseProxy(target, hostOverride, activator.RevisionHeaders, false /* use HTTPS */)
}

proxy.BufferPool = a.bufferPool
proxy.Transport = a.transport
if tracingEnabled {
Expand All @@ -129,3 +141,11 @@ func (a *activationHandler) proxyRequest(revID types.NamespacedName, w http.Resp

proxy.ServeHTTP(w, r)
}

// useSecurePort replaces the default port with HTTPS port (8112).
// TODO: endpointsToDests() should support HTTPS instead of this overwrite but it needs metadata request to be encrypted.
// This code should be removed when https://github.com/knative/serving/issues/12821 was solved.
func useSecurePort(target string) string {
target = strings.Split(target, ":")[0]
return target + ":" + strconv.Itoa(networking.BackendHTTPSPort)
}
10 changes: 5 additions & 5 deletions pkg/activator/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestActivationHandler(t *testing.T) {

ctx, cancel, _ := rtesting.SetupFakeContextWithCancel(t)
defer cancel()
handler := New(ctx, test.throttler, rt, false /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, test.throttler, rt, false /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a test where this is true?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not possible to add the HTTPs proxy in this test code. TestNewHeaderPruningProxyHTTPS covered the part instead.


resp := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestActivationHandlerProxyHeader(t *testing.T) {
ctx, cancel, _ := rtesting.SetupFakeContextWithCancel(t)
defer cancel()

handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)

writer := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestActivationHandlerPassthroughLb(t *testing.T) {
ctx, cancel, _ := rtesting.SetupFakeContextWithCancel(t)
defer cancel()

handler := New(ctx, fakeThrottler{}, rt, true /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, fakeThrottler{}, rt, true /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)

writer := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestActivationHandlerTraceSpans(t *testing.T) {
oct.Finish()
}()

handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)

// Set up config store to populate context.
configStore := setupConfigStore(t, logging.FromContext(ctx))
Expand Down Expand Up @@ -345,7 +345,7 @@ func BenchmarkHandler(b *testing.B) {
}, nil
})

handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)

request := func() *http.Request {
req := httptest.NewRequest(http.MethodGet, "http://example.com", nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/activator/handler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func BenchmarkHandlerChain(b *testing.B) {
})

// Make sure to update this if the activator's main file changes.
ah := New(ctx, fakeThrottler{}, rt, false, logger)
ah := New(ctx, fakeThrottler{}, rt, false, logger, false /* TLS */)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the overhead if tls is true, does it make sense to have run a benchmark with that too?

ah = concurrencyReporter.Handler(ah)
ah = NewTracingHandler(ah)
ah, _ = pkghttp.NewRequestLogHandler(ah, io.Discard, "", nil, false)
Expand Down
8 changes: 6 additions & 2 deletions pkg/http/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ const NoHostOverride = ""
// If hostOverride is not an empty string, the outgoing request's Host header will be
// replaced with that explicit value and the passthrough loadbalancing header will be
// set to enable pod-addressability.
func NewHeaderPruningReverseProxy(target, hostOverride string, headersToRemove []string) *httputil.ReverseProxy {
func NewHeaderPruningReverseProxy(target, hostOverride string, headersToRemove []string, useHTTPS bool) *httputil.ReverseProxy {
return &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = "http"
if useHTTPS {
req.URL.Scheme = "https"
} else {
req.URL.Scheme = "http"
}
req.URL.Host = target

if hostOverride != NoHostOverride {
Expand Down
Loading