Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/kind-e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ jobs:
- v1.22.2/istio
- v1.23.0/contour
- v1.23.0/gateway-api
- v1.23.0/kourier-tls

test-suite:
- ./test/conformance/runtime
Expand Down Expand Up @@ -60,6 +61,13 @@ jobs:
kingress: gateway-api
test-flags: "--enable-alpha"
cluster-suffix: c${{ github.run_id }}.local
- cluster: v1.23.0/kourier-tls
k8s-version: v1.23.0
kind-version: v0.11.1
kind-image-sha: sha256:49824ab1727c04e56a21a5d8372a402fcd32ea51ac96a2706a12af38934f81ac
kingress: kourier
test-flags: "--enable-alpha --enable-beta"
cluster-suffix: c${{ github.run_id }}.local

env:
GOPATH: ${{ github.workspace }}
Expand Down Expand Up @@ -196,6 +204,11 @@ jobs:
INGRESS_CLASS="${{ matrix.kingress }}.ingress.networking.knative.dev"
CLUSTER_DOMAIN="${{ matrix.cluster-suffix }}"

if [[ "${{ matrix.cluster }}" == *-tls ]]; then
echo 'Enabling TLS'
ENABLE_TLS=1
fi

knative_setup
test_setup

Expand Down
5 changes: 4 additions & 1 deletion cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ func main() {

// Create activation handler chain
// Note: innermost handlers are specified first, ie. the last handler in the chain will be executed first
ah := activatorhandler.New(ctx, throttler, transport, networkConfig.EnableMeshPodAddressability, logger)

// Disable TLS for now.
tlsEnabled := false
ah := activatorhandler.New(ctx, throttler, transport, networkConfig.EnableMeshPodAddressability, logger, tlsEnabled)
ah = concurrencyReporter.Handler(ah)
ah = activatorhandler.NewTracingHandler(ah)
reqLogHandler, err := pkghttp.NewRequestLogHandler(ah, logging.NewSyncFileWriter(os.Stdout), "",
Expand Down
51 changes: 46 additions & 5 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@ const (
// This is to give networking a little bit more time to remove the pod
// from its configuration and propagate that to all loadbalancers and nodes.
drainSleepDuration = 30 * time.Second

certPath = queue.CertDirectory + "/tls.crt"
keyPath = queue.CertDirectory + "/tls.key"
)

type config struct {
ContainerConcurrency int `split_words:"true" required:"true"`
QueueServingPort string `split_words:"true" required:"true"`
QueueServingTLSPort string `split_words:"true" required:"true"`
UserPort string `split_words:"true" required:"true"`
RevisionTimeoutSeconds int `split_words:"true" required:"true"`
MaxDurationSeconds int `split_words:"true"` // optional
Expand Down Expand Up @@ -162,16 +166,19 @@ func main() {
if env.ConcurrencyStateEndpoint != "" {
concurrencyendpoint = queue.NewConcurrencyEndpoint(env.ConcurrencyStateEndpoint, env.ConcurrencyStateTokenPath)
}
mainServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint)
mainServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, false)
servers := map[string]*http.Server{
"main": mainServer,
"admin": buildAdminServer(logger, drain),
"metrics": buildMetricsServer(promStatReporter, protoStatReporter),
}
if env.EnableProfiling {
servers["profile"] = profiling.NewServer(profiling.NewHandler(logger, true))
}

if !exists(logger, certPath) || !exists(logger, keyPath) {
servers["admin"] = buildAdminServer(logger, drain)
}

errCh := make(chan error)
for name, server := range servers {
go func(name string, s *http.Server) {
Expand All @@ -182,6 +189,25 @@ func main() {
}(name, server)
}

// Enable TLS server when activator server certs are mounted.
// At this moment activator with TLS does not disable HTTP.
// See also https://github.com/knative/serving/issues/12808.
if exists(logger, certPath) && exists(logger, keyPath) {
mainTLSServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, true /* enable TLS */)
tlsServers := map[string]*http.Server{
"tlsMain": mainTLSServer,
"tlsAdmin": buildAdminServer(logger, drain),
}
for name, server := range tlsServers {
go func(name string, s *http.Server) {
// Don't forward ErrServerClosed as that indicates we're already shutting down.
if err := s.ListenAndServeTLS(certPath, keyPath); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- fmt.Errorf("%s server failed to serve: %w", name, err)
}
}(name, server)
}
}

// Blocks until we actually receive a TERM signal or one of the servers
// exits unexpectedly. We fold both signals together because we only want
// to act on the first of those to reach here.
Expand Down Expand Up @@ -212,6 +238,14 @@ func main() {
}
}

func exists(logger *zap.SugaredLogger, filename string) bool {
_, err := os.Stat(filename)
if err != nil && !os.IsNotExist(err) {
logger.Fatalw(fmt.Sprintf("Failed to verify the file path %q", filename), zap.Error(err))
}
return err == nil
}

func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2 bool) *readiness.Probe {
coreProbe, err := readiness.DecodeProbe(encodedProbe)
if err != nil {
Expand All @@ -224,18 +258,22 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2
}

func buildServer(ctx context.Context, env config, probeContainer func() bool, stats *network.RequestStats, logger *zap.SugaredLogger,
ce *queue.ConcurrencyEndpoint) (server *http.Server, drain func()) {
ce *queue.ConcurrencyEndpoint, enableTLS bool) (server *http.Server, drain func()) {

target := net.JoinHostPort("127.0.0.1", env.UserPort)

httpProxy := pkghttp.NewHeaderPruningReverseProxy(target, pkghttp.NoHostOverride, activator.RevisionHeaders)
httpProxy := pkghttp.NewHeaderPruningReverseProxy(target, pkghttp.NoHostOverride, activator.RevisionHeaders, "http" /* use http to the target*/)
httpProxy.Transport = buildTransport(env, logger)
httpProxy.ErrorHandler = pkghandler.Error(logger)
httpProxy.BufferPool = network.NewBufferPool()
httpProxy.FlushInterval = network.FlushInterval

breaker := buildBreaker(logger, env)
metricsSupported := supportsMetrics(ctx, logger, env)
// Metrics HTTP and non-HTTPS
if enableTLS {
metricsSupported = false
}
tracingEnabled := env.TracingConfigBackend != tracingconfig.None
concurrencyStateEnabled := env.ConcurrencyStateEndpoint != ""
firstByteTimeout := time.Duration(env.RevisionTimeoutSeconds) * time.Second
Expand Down Expand Up @@ -287,6 +325,10 @@ func buildServer(ctx context.Context, env config, probeContainer func() bool, st
composedHandler = requestLogHandler(logger, composedHandler, env)
}

if enableTLS {
return pkgnet.NewServer(":"+env.QueueServingTLSPort, composedHandler), drainer.Drain
}

return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler), drainer.Drain
}

Expand Down Expand Up @@ -338,7 +380,6 @@ func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config)
if env.ServingRequestMetricsBackend == "" {
return false
}

if err := setupMetricsExporter(ctx, logger, env.ServingRequestMetricsBackend, env.MetricsCollectorAddress); err != nil {
logger.Errorw("Error setting up request metrics exporter. Request metrics will be unavailable.", zap.Error(err))
return false
Expand Down
21 changes: 19 additions & 2 deletions pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"errors"
"net/http"
"net/http/httputil"
"strconv"
"strings"

"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/trace"
Expand All @@ -35,6 +37,7 @@ import (
"knative.dev/serving/pkg/activator"
activatorconfig "knative.dev/serving/pkg/activator/config"
pkghttp "knative.dev/serving/pkg/http"
"knative.dev/serving/pkg/networking"
"knative.dev/serving/pkg/queue"
"knative.dev/serving/pkg/reconciler/serverlessservice/resources/names"
)
Expand All @@ -53,10 +56,11 @@ type activationHandler struct {
throttler Throttler
bufferPool httputil.BufferPool
logger *zap.SugaredLogger
tls bool
}

// New constructs a new http.Handler that deals with revision activation.
func New(_ context.Context, t Throttler, transport http.RoundTripper, usePassthroughLb bool, logger *zap.SugaredLogger) http.Handler {
func New(_ context.Context, t Throttler, transport http.RoundTripper, usePassthroughLb bool, logger *zap.SugaredLogger, tlsEnabled bool) http.Handler {
return &activationHandler{
transport: transport,
tracingTransport: &ochttp.Transport{
Expand All @@ -67,6 +71,7 @@ func New(_ context.Context, t Throttler, transport http.RoundTripper, usePassthr
throttler: t,
bufferPool: network.NewBufferPool(),
logger: logger,
tls: tlsEnabled,
}
}

Expand Down Expand Up @@ -116,7 +121,12 @@ func (a *activationHandler) proxyRequest(revID types.NamespacedName, w http.Resp
if usePassthroughLb {
hostOverride = names.PrivateService(revID.Name) + "." + revID.Namespace
}
proxy := pkghttp.NewHeaderPruningReverseProxy(target, hostOverride, activator.RevisionHeaders)
proxy := pkghttp.NewHeaderPruningReverseProxy(target, hostOverride, activator.RevisionHeaders, "http" /* use https to the target */)

// TODO: Use configmap or annotation to enable secure proxy.
if a.tls {
proxy = pkghttp.NewHeaderPruningReverseProxy(useSecurePort(target), hostOverride, activator.RevisionHeaders, "https" /* use https to the target */)
}
proxy.BufferPool = a.bufferPool
proxy.Transport = a.transport
if tracingEnabled {
Expand All @@ -129,3 +139,10 @@ func (a *activationHandler) proxyRequest(revID types.NamespacedName, w http.Resp

proxy.ServeHTTP(w, r)
}

// useSecurePort replaces the default port with HTTPS port (8112).
// TODO: endpointsToDests() should support HTTPS instead of this hack but it needs metadata request to be encrypted.
func useSecurePort(target string) string {
target = strings.Split(target, ":")[0]
return target + ":" + strconv.Itoa(networking.BackendHTTPSPort)
}
10 changes: 5 additions & 5 deletions pkg/activator/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestActivationHandler(t *testing.T) {

ctx, cancel, _ := rtesting.SetupFakeContextWithCancel(t)
defer cancel()
handler := New(ctx, test.throttler, rt, false /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, test.throttler, rt, false /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)

resp := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestActivationHandlerProxyHeader(t *testing.T) {
ctx, cancel, _ := rtesting.SetupFakeContextWithCancel(t)
defer cancel()

handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)

writer := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestActivationHandlerPassthroughLb(t *testing.T) {
ctx, cancel, _ := rtesting.SetupFakeContextWithCancel(t)
defer cancel()

handler := New(ctx, fakeThrottler{}, rt, true /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, fakeThrottler{}, rt, true /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)

writer := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestActivationHandlerTraceSpans(t *testing.T) {
oct.Finish()
}()

handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)

// Set up config store to populate context.
configStore := setupConfigStore(t, logging.FromContext(ctx))
Expand Down Expand Up @@ -345,7 +345,7 @@ func BenchmarkHandler(b *testing.B) {
}, nil
})

handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)

request := func() *http.Request {
req := httptest.NewRequest(http.MethodGet, "http://example.com", nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/activator/handler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func BenchmarkHandlerChain(b *testing.B) {
})

// Make sure to update this if the activator's main file changes.
ah := New(ctx, fakeThrottler{}, rt, false, logger)
ah := New(ctx, fakeThrottler{}, rt, false, logger, false /* TLS */)
ah = concurrencyReporter.Handler(ah)
ah = NewTracingHandler(ah)
ah, _ = pkghttp.NewRequestLogHandler(ah, io.Discard, "", nil, false)
Expand Down
4 changes: 2 additions & 2 deletions pkg/http/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ const NoHostOverride = ""
// If hostOverride is not an empty string, the outgoing request's Host header will be
// replaced with that explicit value and the passthrough loadbalancing header will be
// set to enable pod-addressability.
func NewHeaderPruningReverseProxy(target, hostOverride string, headersToRemove []string) *httputil.ReverseProxy {
func NewHeaderPruningReverseProxy(target, hostOverride string, headersToRemove []string, scheme string) *httputil.ReverseProxy {
return &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = "http"
req.URL.Scheme = scheme
req.URL.Host = target

if hostOverride != NoHostOverride {
Expand Down
2 changes: 1 addition & 1 deletion pkg/http/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestNewHeaderPruningProxy(t *testing.T) {
proxy := NewHeaderPruningReverseProxy(serverURL.Host, test.host, []string{
"header-to-remove-1",
"header-to-remove-2",
})
}, "http")

resp := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, test.url, nil)
Expand Down
3 changes: 3 additions & 0 deletions pkg/networking/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
// BackendHTTP2Port is the backend, i.e. `targetPort` that we setup for HTTP/2 services.
BackendHTTP2Port = 8013

// BackendHTTPSPort is the backend. i.e. `targetPort` that we setup for HTTPS services.
BackendHTTPSPort = 8112

// QueueAdminPort specifies the port number for
// health check and lifecycle hooks for queue-proxy.
QueueAdminPort = 8022
Expand Down
3 changes: 3 additions & 0 deletions pkg/queue/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ const (
// Main usage is to delay the termination of user-container until all
// accepted requests have been processed.
RequestQueueDrainPath = "/wait-for-drain"

// CertDirectory is the name of the directory path where certificates are stored.
CertDirectory = "/var/lib/knative/certs"
)
22 changes: 22 additions & 0 deletions pkg/reconciler/revision/resources/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ var (
},
}

certVolumeMount = corev1.VolumeMount{
MountPath: queue.CertDirectory,
Name: "server-certs",
ReadOnly: true,
}

varTokenVolumeMount = corev1.VolumeMount{
Name: varTokenVolume.Name,
MountPath: concurrencyStateTokenVolumeMountPath,
Expand All @@ -89,6 +95,17 @@ var (
}
)

func certVolume(secret string) corev1.Volume {
return corev1.Volume{
Name: "server-certs",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secret,
},
},
}
}

func rewriteUserProbe(p *corev1.Probe, userPort int) {
if p == nil {
return
Expand Down Expand Up @@ -122,6 +139,11 @@ func makePodSpec(rev *v1.Revision, cfg *config.Config) (*corev1.PodSpec, error)
extraVolumes = append(extraVolumes, varTokenVolume)
}

if cfg.Network.QueueProxyServerCert != "" {
queueContainer.VolumeMounts = append(queueContainer.VolumeMounts, certVolumeMount)
extraVolumes = append(extraVolumes, certVolume(cfg.Network.QueueProxyServerCert))
}

podSpec := BuildPodSpec(rev, append(BuildUserContainers(rev), *queueContainer), cfg)
podSpec.Volumes = append(podSpec.Volumes, extraVolumes...)

Expand Down
Loading