diff --git a/cmd/queue/main.go b/cmd/queue/main.go index a63170b18de3..06e5c5a75221 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -31,6 +31,12 @@ import ( "github.com/kelseyhightower/envconfig" "github.com/pkg/errors" + "go.opencensus.io/stats" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/wait" + "knative.dev/pkg/logging/logkey" + "knative.dev/pkg/metrics" + "knative.dev/pkg/signals" "knative.dev/serving/pkg/activator" activatorutil "knative.dev/serving/pkg/activator/util" "knative.dev/serving/pkg/apis/networking" @@ -40,16 +46,8 @@ import ( "knative.dev/serving/pkg/network" "knative.dev/serving/pkg/queue" "knative.dev/serving/pkg/queue/health" + "knative.dev/serving/pkg/queue/readiness" queuestats "knative.dev/serving/pkg/queue/stats" - - "go.opencensus.io/stats" - "go.uber.org/zap" - - "knative.dev/pkg/logging/logkey" - "knative.dev/pkg/metrics" - "knative.dev/pkg/signals" - - "k8s.io/apimachinery/pkg/util/wait" ) const ( @@ -67,10 +65,6 @@ const ( // in the mesh. quitSleepDuration = 20 * time.Second - // Set equal to the queue-proxy's ExecProbe timeout to take - // advantage of the full window - probeTimeout = 10 * time.Second - badProbeTemplate = "unexpected probe header value: %s" // Metrics' names (without component prefix). @@ -84,6 +78,11 @@ const ( requestQueueHealthPath = "/health" healthURLTemplate = "http://127.0.0.1:%d" + requestQueueHealthPath + tcpProbeTimeout = 100 * time.Millisecond + // The 25 millisecond retry interval is an unscientific compromise between wanting to get + // started as early as possible while still wanting to give the container some breathing + // room to get up and running. + aggressivePollInterval = 25 * time.Millisecond ) var ( @@ -98,8 +97,6 @@ var ( healthState = &health.State{} promStatReporter *queue.PrometheusStatsReporter // Prometheus stats reporter. - probe = flag.Bool("probe", false, "run readiness probe") - // Metric counters. requestCountM = stats.Int64( requestCountN, @@ -117,6 +114,7 @@ var ( appResponseTimeInMsecN, "The response time in millisecond", stats.UnitMilliseconds) + readinessProbeTimeout = flag.Int("probe-period", -1, "run readiness probe with given timeout") ) type config struct { @@ -138,6 +136,7 @@ type config struct { ServingLoggingLevel string `split_words:"true" required:"true"` ServingRequestMetricsBackend string `split_words:"true" required:"true"` ServingRequestLogTemplate string `split_words:"true" required:"true"` + ServingReadinessProbe string `split_words:"true" required:"true"` } func initConfig(env config) { @@ -174,29 +173,8 @@ func knativeProxyHeader(r *http.Request) string { return r.Header.Get(network.ProxyHeaderName) } -func probeUserContainer() bool { - var err error - wait.PollImmediate(50*time.Millisecond, probeTimeout, func() (bool, error) { - logger.Debug("TCP probing the user-container.") - config := health.TCPProbeConfigOptions{ - Address: userTargetAddress, - SocketTimeout: 100 * time.Millisecond, - } - err = health.TCPProbe(config) - return err == nil, nil - }) - - if err == nil { - logger.Info("User-container successfully probed.") - } else { - logger.Errorw("User-container could not be probed successfully.", zap.Error(err)) - } - - return err == nil -} - // Make handler a closure for testing. -func handler(reqChan chan queue.ReqEvent, breaker *queue.Breaker, handler http.Handler) func(http.ResponseWriter, *http.Request) { +func handler(reqChan chan queue.ReqEvent, breaker *queue.Breaker, handler http.Handler, prober func() bool) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { ph := knativeProbeHeader(r) switch { @@ -205,12 +183,17 @@ func handler(reqChan chan queue.ReqEvent, breaker *queue.Breaker, handler http.H http.Error(w, fmt.Sprintf(badProbeTemplate, ph), http.StatusBadRequest) return } - if probeUserContainer() { - // Respond with the name of the component handling the request. - w.Write([]byte(queue.Name)) + if prober != nil { + if prober() { + // Respond with the name of the component handling the request. + w.Write([]byte(queue.Name)) + } else { + http.Error(w, "container not ready", http.StatusServiceUnavailable) + } } else { - http.Error(w, "container not ready", http.StatusServiceUnavailable) + http.Error(w, "no probe", http.StatusInternalServerError) } + return case network.IsKubeletProbe(r): // Do not count health checks for concurrency metrics @@ -243,40 +226,42 @@ func handler(reqChan chan queue.ReqEvent, breaker *queue.Breaker, handler http.H } // Sets up /health and /wait-for-drain endpoints. -func createAdminHandlers() *http.ServeMux { +func createAdminHandlers(p *readiness.Probe) *http.ServeMux { mux := http.NewServeMux() - // TODO(@joshrider): temporary change while waiting on other PRs to merge (See #4014) - mux.HandleFunc(requestQueueHealthPath, healthState.HealthHandler(probeUserContainer, true /*isNotAggressive*/)) + + mux.HandleFunc(requestQueueHealthPath, healthState.HealthHandler(p.ProbeContainer, p.IsAggressive())) mux.HandleFunc(queue.RequestQueueDrainPath, healthState.DrainHandler()) return mux } - -func probeQueueHealthPath(port int, timeout time.Duration) error { +func probeQueueHealthPath(port int, timeoutSeconds int) error { url := fmt.Sprintf(healthURLTemplate, port) - + timeoutDuration := readiness.PollTimeout + if timeoutSeconds != 0 { + timeoutDuration = time.Duration(timeoutSeconds) * time.Second + } httpClient := &http.Client{ Transport: &http.Transport{ // Do not use the cached connection DisableKeepAlives: true, }, - Timeout: timeout, + Timeout: timeoutDuration, } + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + defer cancel() + stopCh := ctx.Done() var lastErr error - - // The 25 millisecond retry interval is an unscientific compromise between wanting to get - // started as early as possible while still wanting to give the container some breathing - // room to get up and running. - timeoutErr := wait.PollImmediate(25*time.Millisecond, timeout, func() (bool, error) { + // Using PollImmediateUntil instead of PollImmediate because if timeout is reached while waiting for first + // invocation of conditionFunc, it exits immediately without trying for a second time. + timeoutErr := wait.PollImmediateUntil(aggressivePollInterval, func() (bool, error) { var res *http.Response if res, lastErr = httpClient.Get(url); res == nil { return false, nil } defer res.Body.Close() - - return res.StatusCode == http.StatusOK, nil - }) + return health.IsHTTPProbeReady(res), nil + }, stopCh) if lastErr != nil { return errors.Wrap(lastErr, "failed to probe") @@ -293,8 +278,8 @@ func probeQueueHealthPath(port int, timeout time.Duration) error { func main() { flag.Parse() - if *probe { - if err := probeQueueHealthPath(networking.QueueAdminPort, probeTimeout); err != nil { + if *readinessProbeTimeout >= 0 { + if err := probeQueueHealthPath(networking.QueueAdminPort, *readinessProbeTimeout); err != nil { // used instead of the logger to produce a concise event message fmt.Fprintln(os.Stderr, err) os.Exit(1) @@ -356,9 +341,16 @@ func main() { StatChan: statChan, }, time.Now()) + coreProbe, err := readiness.DecodeProbe(env.ServingReadinessProbe) + if err != nil { + logger.Fatalw("Queue container failed to parse readiness probe", zap.Error(err)) + } + + rp := readiness.NewProbe(coreProbe, logger.With(zap.String(logkey.Key, "readinessProbe"))) + adminServer := &http.Server{ Addr: ":" + strconv.Itoa(networking.QueueAdminPort), - Handler: createAdminHandlers(), + Handler: createAdminHandlers(rp), } metricsSupported := false @@ -379,7 +371,7 @@ func main() { if metricsSupported { composedHandler = pushRequestMetricHandler(httpProxy, appRequestCountM, appResponseTimeInMsecM, env) } - composedHandler = http.HandlerFunc(handler(reqChan, breaker, composedHandler)) + composedHandler = http.HandlerFunc(handler(reqChan, breaker, composedHandler, rp.ProbeContainer)) composedHandler = queue.ForwardedShimHandler(composedHandler) composedHandler = queue.TimeToFirstByteTimeoutHandler(composedHandler, time.Duration(env.RevisionTimeoutSeconds)*time.Second, "request timeout") diff --git a/cmd/queue/main_test.go b/cmd/queue/main_test.go index 6db5344824d2..0cb0111f6928 100644 --- a/cmd/queue/main_test.go +++ b/cmd/queue/main_test.go @@ -30,8 +30,6 @@ import ( "time" "github.com/google/go-cmp/cmp" - - logtesting "knative.dev/pkg/logging/testing" "knative.dev/serving/pkg/activator" "knative.dev/serving/pkg/network" "knative.dev/serving/pkg/queue" @@ -70,7 +68,7 @@ func TestHandlerReqEvent(t *testing.T) { params := queue.BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10} breaker := queue.NewBreaker(params) reqChan := make(chan queue.ReqEvent, 10) - h := handler(reqChan, breaker, proxy) + h := handler(reqChan, breaker, proxy, func() bool { return true }) writer := httptest.NewRecorder() req := httptest.NewRequest(http.MethodPost, "http://example.com", nil) @@ -91,48 +89,56 @@ func TestHandlerReqEvent(t *testing.T) { } } -func TestProberHandler(t *testing.T) { - defer logtesting.ClearAll() - logger = logtesting.TestLogger(t) - - // All arguments are needed only for serving. - h := handler(nil, nil, nil) - - writer := httptest.NewRecorder() - req := httptest.NewRequest(http.MethodPost, "http://example.com", nil) - - req.Header.Set(network.ProbeHeaderName, "test-probe") - req.Header.Set(network.ProxyHeaderName, activator.Name) - - h(writer, req) - - // Should get 400. - if got, want := writer.Code, http.StatusBadRequest; got != want { - t.Errorf("Bad probe status = %v, want: %v", got, want) - } - if got, want := strings.TrimSpace(writer.Body.String()), fmt.Sprintf(badProbeTemplate, "test-probe"); got != want { - // \r\n might be inserted, etc. - t.Errorf("Bad probe body = %q, want: %q, diff: %s", got, want, cmp.Diff(got, want)) - } - - // Fix up the header. - writer = httptest.NewRecorder() - req.Header.Set(network.ProbeHeaderName, queue.Name) - - server := httptest.NewServer(http.HandlerFunc(h)) - defer server.Close() - userTargetAddress = strings.TrimPrefix(server.URL, "http://") - h(writer, req) - - // Should get 200. - if got, want := writer.Code, http.StatusOK; got != want { - t.Errorf("Good probe status = %v, want: %v", got, want) - } - - // Body should be the `queue`. - if got, want := strings.TrimSpace(writer.Body.String()), queue.Name; got != want { - // \r\n might be inserted, etc. - t.Errorf("Good probe body = %q, want: %q, diff: %s", got, want, cmp.Diff(got, want)) +func TestProbeHandler(t *testing.T) { + testcases := []struct { + name string + prober func() bool + wantCode int + wantBody string + requestHeader string + }{{ + name: "unexpected probe header", + prober: func() bool { return true }, + wantCode: http.StatusBadRequest, + wantBody: fmt.Sprintf(badProbeTemplate, "test-probe"), + requestHeader: "test-probe", + }, { + name: "true probe function", + prober: func() bool { return true }, + wantCode: http.StatusOK, + wantBody: queue.Name, + requestHeader: queue.Name, + }, { + name: "nil probe function", + prober: nil, + wantCode: http.StatusInternalServerError, + wantBody: "no probe", + requestHeader: queue.Name, + }, { + name: "false probe function", + prober: func() bool { return false }, + wantCode: http.StatusServiceUnavailable, + wantBody: "container not ready", + requestHeader: queue.Name, + }} + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + writer := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "http://example.com", nil) + req.Header.Set(network.ProbeHeaderName, tc.requestHeader) + + h := handler(nil, nil, nil, tc.prober) + h(writer, req) + + if got, want := writer.Code, tc.wantCode; got != want { + t.Errorf("probe status = %v, want: %v", got, want) + } + if got, want := strings.TrimSpace(writer.Body.String()), tc.wantBody; got != want { + // \r\n might be inserted, etc. + t.Errorf("probe body = %q, want: %q, diff: %s", got, want, cmp.Diff(got, want)) + } + }) } } @@ -164,7 +170,8 @@ func TestCreateVarLogLink(t *testing.T) { func TestProbeQueueConnectionFailure(t *testing.T) { port := 12345 // some random port (that's not listening) - if err := probeQueueHealthPath(port, time.Second); err == nil { + + if err := probeQueueHealthPath(port, 1); err == nil { t.Error("Expected error, got nil") } } @@ -188,7 +195,7 @@ func TestProbeQueueNotReady(t *testing.T) { t.Fatalf("Failed to convert port(%s) to int: %v", u.Port(), err) } - err = probeQueueHealthPath(port, 1*time.Second) + err = probeQueueHealthPath(port, 1) if diff := cmp.Diff(err.Error(), "probe returned not ready"); diff != "" { t.Errorf("Unexpected not ready message: %s", diff) @@ -218,9 +225,41 @@ func TestProbeQueueReady(t *testing.T) { t.Fatalf("Failed to convert port(%s) to int: %v", u.Port(), err) } - if err = probeQueueHealthPath(port, 1*time.Second); err != nil { - t.Errorf("probeQueueHealthPath(%d) = %s", port, err) + if err = probeQueueHealthPath(port, 1); err != nil { + t.Errorf("probeQueueHealthPath(%d, 1s) = %s", port, err) + } + + if !queueProbed { + t.Errorf("Expected the queue proxy server to be probed") } +} + +func TestProbeQueueTimeout(t *testing.T) { + queueProbed := false + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + queueProbed = true + time.Sleep(2 * time.Second) + w.WriteHeader(http.StatusOK) + })) + + defer ts.Close() + + u, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("%s is not a valid URL: %v", ts.URL, err) + } + + port, err := strconv.Atoi(u.Port()) + if err != nil { + t.Fatalf("failed to convert port(%s) to int", u.Port()) + } + + timeout := 1 + if err = probeQueueHealthPath(port, timeout); err == nil { + t.Errorf("Expected probeQueueHealthPath(%d, %v) to return timeout error", port, timeout) + } + + ts.Close() if !queueProbed { t.Errorf("Expected the queue proxy server to be probed") @@ -250,7 +289,8 @@ func TestProbeQueueDelayedReady(t *testing.T) { t.Fatalf("Failed to convert port(%s) to int: %v", u.Port(), err) } - if err := probeQueueHealthPath(port, time.Second); err != nil { + timeout := 0 + if err := probeQueueHealthPath(port, timeout); err != nil { t.Errorf("probeQueueHealthPath(%d) = %s", port, err) } } diff --git a/pkg/apis/serving/k8s_validation.go b/pkg/apis/serving/k8s_validation.go index cc189a49e05b..b55cd9e04e63 100644 --- a/pkg/apis/serving/k8s_validation.go +++ b/pkg/apis/serving/k8s_validation.go @@ -283,7 +283,7 @@ func ValidateContainer(container corev1.Container, volumes sets.String) *apis.Fi // Ports errs = errs.Also(validateContainerPorts(container.Ports).ViaField("ports")) // Readiness Probes - errs = errs.Also(validateProbe(container.ReadinessProbe).ViaField("readinessProbe")) + errs = errs.Also(validateReadinessProbe(container.ReadinessProbe).ViaField("readinessProbe")) // Resources errs = errs.Also(validateResources(&container.Resources).ViaField("resources")) // SecurityContext @@ -421,6 +421,43 @@ func validateContainerPorts(ports []corev1.ContainerPort) *apis.FieldError { return errs } +func validateReadinessProbe(p *corev1.Probe) *apis.FieldError { + if p == nil { + return nil + } + + errs := validateProbe(p) + + if p.PeriodSeconds < 0 { + errs = errs.Also(apis.ErrOutOfBoundsValue(p.PeriodSeconds, 0, math.MaxInt32, "periodSeconds")) + } + + if p.SuccessThreshold < 1 { + errs = errs.Also(apis.ErrOutOfBoundsValue(p.SuccessThreshold, 1, math.MaxInt32, "successThreshold")) + } + + // PeriodSeconds == 0 indicates Knative's special probe with aggressive retries + if p.PeriodSeconds == 0 { + if p.FailureThreshold != 0 { + errs = errs.Also(apis.ErrDisallowedFields("failureThreshold")) + } + + if p.TimeoutSeconds != 0 { + errs = errs.Also(apis.ErrDisallowedFields("timeoutSeconds")) + } + } else { + if p.TimeoutSeconds < 1 { + errs = errs.Also(apis.ErrOutOfBoundsValue(p.TimeoutSeconds, 1, math.MaxInt32, "timeoutSeconds")) + } + + if p.FailureThreshold < 1 { + errs = errs.Also(apis.ErrOutOfBoundsValue(p.FailureThreshold, 1, math.MaxInt32, "failureThreshold")) + } + } + + return errs +} + func validateProbe(p *corev1.Probe) *apis.FieldError { if p == nil { return nil @@ -430,12 +467,26 @@ func validateProbe(p *corev1.Probe) *apis.FieldError { h := p.Handler errs = errs.Also(apis.CheckDisallowedFields(h, *HandlerMask(&h))) - switch { - case h.HTTPGet != nil: + var handlers []string + + if h.HTTPGet != nil { + handlers = append(handlers, "httpGet") errs = errs.Also(apis.CheckDisallowedFields(*h.HTTPGet, *HTTPGetActionMask(h.HTTPGet))).ViaField("httpGet") - case h.TCPSocket != nil: + } + if h.TCPSocket != nil { + handlers = append(handlers, "tcpSocket") errs = errs.Also(apis.CheckDisallowedFields(*h.TCPSocket, *TCPSocketActionMask(h.TCPSocket))).ViaField("tcpSocket") } + if h.Exec != nil { + handlers = append(handlers, "exec") + errs = errs.Also(apis.CheckDisallowedFields(*h.Exec, *ExecActionMask(h.Exec))).ViaField("exec") + } + + if len(handlers) == 0 { + errs = errs.Also(apis.ErrMissingField("handler")) + } else if len(handlers) > 1 { + errs = errs.Also(apis.ErrMultipleOneOf(handlers...)) + } return errs } diff --git a/pkg/apis/serving/k8s_validation_test.go b/pkg/apis/serving/k8s_validation_test.go index 26b24aac4a1f..899d78b51ad6 100644 --- a/pkg/apis/serving/k8s_validation_test.go +++ b/pkg/apis/serving/k8s_validation_test.go @@ -499,6 +499,10 @@ func TestContainerValidation(t *testing.T) { c: corev1.Container{ Image: "foo", ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, Handler: corev1.Handler{ HTTPGet: &corev1.HTTPGetAction{ Path: "/", @@ -512,11 +516,76 @@ func TestContainerValidation(t *testing.T) { }, }, want: nil, + }, { + name: "valid with exec probes ", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + }, + }, + }, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{}, + }, + }, + }, + want: nil, + }, { + name: "invalid with no handler", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + }, + }, + }, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{}, + }, + }, + want: apis.ErrMissingField("livenessProbe.handler"), + }, { + name: "invalid with multiple handlers", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + }, + Exec: &corev1.ExecAction{}, + TCPSocket: &corev1.TCPSocketAction{}, + }, + }, + }, + want: apis.ErrMultipleOneOf("readinessProbe.exec", "readinessProbe.tcpSocket", "readinessProbe.httpGet"), }, { name: "invalid readiness http probe (has port)", c: corev1.Container{ Image: "foo", ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, Handler: corev1.Handler{ HTTPGet: &corev1.HTTPGetAction{ Path: "/", @@ -526,6 +595,56 @@ func TestContainerValidation(t *testing.T) { }, }, want: apis.ErrDisallowedFields("readinessProbe.httpGet.port"), + }, { + name: "invalid readiness probe (has failureThreshold while using special probe)", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 0, + FailureThreshold: 2, + SuccessThreshold: 1, + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + }, + }, + }, + }, + want: apis.ErrDisallowedFields("readinessProbe.failureThreshold"), + }, { + name: "invalid readiness probe (has timeoutSeconds while using special probe)", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 0, + TimeoutSeconds: 2, + SuccessThreshold: 1, + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + }, + }, + }, + }, + want: apis.ErrDisallowedFields("readinessProbe.timeoutSeconds"), + }, { + name: "out of bounds probe values", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: -1, + TimeoutSeconds: 0, + SuccessThreshold: 0, + FailureThreshold: 0, + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{}, + }, + }, + }, + want: apis.ErrOutOfBoundsValue(-1, 0, math.MaxInt32, "readinessProbe.periodSeconds").Also( + apis.ErrOutOfBoundsValue(0, 1, math.MaxInt32, "readinessProbe.timeoutSeconds")).Also( + apis.ErrOutOfBoundsValue(0, 1, math.MaxInt32, "readinessProbe.successThreshold")).Also( + apis.ErrOutOfBoundsValue(0, 1, math.MaxInt32, "readinessProbe.failureThreshold")), }, { name: "disallowed security context field", c: corev1.Container{ diff --git a/pkg/apis/serving/v1alpha1/configuration_defaults_test.go b/pkg/apis/serving/v1alpha1/configuration_defaults_test.go index c0be3f5e3f3d..fd554331cde3 100644 --- a/pkg/apis/serving/v1alpha1/configuration_defaults_test.go +++ b/pkg/apis/serving/v1alpha1/configuration_defaults_test.go @@ -67,8 +67,9 @@ func TestConfigurationDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -96,9 +97,10 @@ func TestConfigurationDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -129,8 +131,9 @@ func TestConfigurationDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -149,7 +152,8 @@ func TestConfigurationDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(99), }, DeprecatedContainer: &corev1.Container{ - Resources: defaultResources, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -164,8 +168,9 @@ func TestConfigurationDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(99), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, diff --git a/pkg/apis/serving/v1alpha1/revision_defaults_test.go b/pkg/apis/serving/v1alpha1/revision_defaults_test.go index 5ccf9f7b6347..c2572a2ab6ac 100644 --- a/pkg/apis/serving/v1alpha1/revision_defaults_test.go +++ b/pkg/apis/serving/v1alpha1/revision_defaults_test.go @@ -30,6 +30,13 @@ import ( "knative.dev/serving/pkg/apis/serving/v1beta1" ) +var defaultProbe = &corev1.Probe{ + SuccessThreshold: 1, + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{}, + }, +} + func TestRevisionDefaulting(t *testing.T) { defer logtesting.ClearAll() tests := []struct { @@ -47,8 +54,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -66,8 +74,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -97,8 +106,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(123), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -127,7 +137,8 @@ func TestRevisionDefaulting(t *testing.T) { Name: "bar", ReadOnly: true, }}, - Resources: defaultResources, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, RevisionSpec: v1beta1.RevisionSpec{ ContainerConcurrency: 1, @@ -163,7 +174,8 @@ func TestRevisionDefaulting(t *testing.T) { Name: "bar", ReadOnly: true, }}, - Resources: defaultResources, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, ContainerConcurrency: 1, @@ -184,8 +196,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(99), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Image: "foo", - Resources: defaultResources, + Image: "foo", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -201,9 +214,10 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(99), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "foo", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "foo", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -227,8 +241,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(99), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -249,8 +264,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -273,8 +289,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, diff --git a/pkg/apis/serving/v1alpha1/service_defaults_test.go b/pkg/apis/serving/v1alpha1/service_defaults_test.go index fae03a6b179f..b24047f7ff0f 100644 --- a/pkg/apis/serving/v1alpha1/service_defaults_test.go +++ b/pkg/apis/serving/v1alpha1/service_defaults_test.go @@ -88,8 +88,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -124,9 +125,10 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -173,8 +175,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -207,8 +210,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -244,9 +248,10 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -294,8 +299,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(99), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -332,8 +338,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -368,8 +375,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -428,8 +436,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -486,8 +495,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -542,8 +552,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(99), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -580,9 +591,10 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "blah", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "blah", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -618,8 +630,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -666,9 +679,10 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "blah", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "blah", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -720,9 +734,10 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "blah", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "blah", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, diff --git a/pkg/apis/serving/v1beta1/configuration_defaults_test.go b/pkg/apis/serving/v1beta1/configuration_defaults_test.go index 50b253942afe..70e63fa2d3a5 100644 --- a/pkg/apis/serving/v1beta1/configuration_defaults_test.go +++ b/pkg/apis/serving/v1beta1/configuration_defaults_test.go @@ -41,8 +41,9 @@ func TestConfigurationDefaulting(t *testing.T) { Spec: RevisionSpec{ PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), @@ -71,9 +72,10 @@ func TestConfigurationDefaulting(t *testing.T) { Spec: RevisionSpec{ PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), @@ -103,9 +105,10 @@ func TestConfigurationDefaulting(t *testing.T) { Spec: RevisionSpec{ PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, TimeoutSeconds: ptr.Int64(60), diff --git a/pkg/apis/serving/v1beta1/revision_defaults.go b/pkg/apis/serving/v1beta1/revision_defaults.go index 9d66497480f6..dc212293b1d7 100644 --- a/pkg/apis/serving/v1beta1/revision_defaults.go +++ b/pkg/apis/serving/v1beta1/revision_defaults.go @@ -83,6 +83,16 @@ func (rs *RevisionSpec) SetDefaults(ctx context.Context) { container.Resources.Limits[corev1.ResourceMemory] = *rsrc } } + if container.ReadinessProbe == nil { + container.ReadinessProbe = &corev1.Probe{} + } + if container.ReadinessProbe.TCPSocket == nil && container.ReadinessProbe.HTTPGet == nil && container.ReadinessProbe.Exec == nil { + container.ReadinessProbe.TCPSocket = &corev1.TCPSocketAction{} + } + + if container.ReadinessProbe.SuccessThreshold == 0 { + container.ReadinessProbe.SuccessThreshold = 1 + } vms := container.VolumeMounts for i := range vms { diff --git a/pkg/apis/serving/v1beta1/revision_defaults_test.go b/pkg/apis/serving/v1beta1/revision_defaults_test.go index 5e05e5f0f9c0..a5bfe188c83b 100644 --- a/pkg/apis/serving/v1beta1/revision_defaults_test.go +++ b/pkg/apis/serving/v1beta1/revision_defaults_test.go @@ -36,6 +36,12 @@ var ( Requests: corev1.ResourceList{}, Limits: corev1.ResourceList{}, } + defaultProbe = &corev1.Probe{ + SuccessThreshold: 1, + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{}, + }, + } ignoreUnexportedResources = cmpopts.IgnoreUnexported(resource.Quantity{}) ) @@ -54,8 +60,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -82,8 +89,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(123), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -114,7 +122,8 @@ func TestRevisionDefaulting(t *testing.T) { Name: "bar", ReadOnly: true, }}, - Resources: defaultResources, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, ContainerConcurrency: 1, @@ -130,6 +139,13 @@ func TestRevisionDefaulting(t *testing.T) { PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ Name: "foo", + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.2", + }, + }, + }, }}, }, }, @@ -142,14 +158,34 @@ func TestRevisionDefaulting(t *testing.T) { Containers: []corev1.Container{{ Name: "foo", Resources: defaultResources, + ReadinessProbe: &corev1.Probe{ + SuccessThreshold: 1, + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.2", + }, + }, + }, }}, }, }, }, }, { - name: "partially initialized", + name: "no overwrite exec", in: &Revision{ - Spec: RevisionSpec{}, + Spec: RevisionSpec{ + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"echo", "hi"}, + }, + }, + }, + }}, + }, + }, }, want: &Revision{ Spec: RevisionSpec{ @@ -158,6 +194,31 @@ func TestRevisionDefaulting(t *testing.T) { Containers: []corev1.Container{{ Name: config.DefaultUserContainerName, Resources: defaultResources, + ReadinessProbe: &corev1.Probe{ + SuccessThreshold: 1, + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"echo", "hi"}, + }, + }, + }, + }}, + }, + }, + }, + }, { + name: "partially initialized", + in: &Revision{ + Spec: RevisionSpec{}, + }, + want: &Revision{ + Spec: RevisionSpec{ + TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, diff --git a/pkg/apis/serving/v1beta1/service_defaults_test.go b/pkg/apis/serving/v1beta1/service_defaults_test.go index 73125af8d746..32c73ff16008 100644 --- a/pkg/apis/serving/v1beta1/service_defaults_test.go +++ b/pkg/apis/serving/v1beta1/service_defaults_test.go @@ -45,8 +45,9 @@ func TestServiceDefaulting(t *testing.T) { Spec: RevisionSpec{ PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), @@ -85,9 +86,10 @@ func TestServiceDefaulting(t *testing.T) { Spec: RevisionSpec{ PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), @@ -127,9 +129,10 @@ func TestServiceDefaulting(t *testing.T) { Spec: RevisionSpec{ PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, TimeoutSeconds: ptr.Int64(60), @@ -181,9 +184,10 @@ func TestServiceDefaulting(t *testing.T) { Spec: RevisionSpec{ PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), diff --git a/pkg/queue/readiness/probe_encoding.go b/pkg/queue/readiness/probe_encoding.go index ba59b949266f..0d12416ae616 100644 --- a/pkg/queue/readiness/probe_encoding.go +++ b/pkg/queue/readiness/probe_encoding.go @@ -19,6 +19,8 @@ package readiness import ( "encoding/json" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" ) @@ -33,6 +35,10 @@ func DecodeProbe(jsonProbe string) (*corev1.Probe, error) { // EncodeProbe takes *corev1.Probe object and returns marshalled Probe JSON string and an error. func EncodeProbe(rp *corev1.Probe) (string, error) { + if rp == nil { + return "", errors.New("cannot encode nil probe") + } + probeJSON, err := json.Marshal(rp) if err != nil { return "", err diff --git a/pkg/queue/readiness/probe_encoding_test.go b/pkg/queue/readiness/probe_encoding_test.go index 5868d1db3768..7ae97b4ada40 100644 --- a/pkg/queue/readiness/probe_encoding_test.go +++ b/pkg/queue/readiness/probe_encoding_test.go @@ -84,3 +84,15 @@ func TestEncodeProbe(t *testing.T) { t.Errorf("Probe diff: %s; got %v, want %v", diff, jsonProbe, wantProbe) } } + +func TestEncodeNilProbe(t *testing.T) { + jsonProbe, err := EncodeProbe(nil) + + if err == nil { + t.Errorf("Expected error") + } + + if jsonProbe != "" { + t.Errorf("Expected empty probe string; got %s", jsonProbe) + } +} diff --git a/pkg/reconciler/revision/resources/deploy.go b/pkg/reconciler/revision/resources/deploy.go index 48742790a5a4..22c91212bee5 100644 --- a/pkg/reconciler/revision/resources/deploy.go +++ b/pkg/reconciler/revision/resources/deploy.go @@ -134,8 +134,15 @@ func makePodSpec(rev *v1alpha1.Revision, loggingConfig *logging.Config, observab userContainer.TerminationMessagePolicy = corev1.TerminationMessageFallbackToLogsOnError } + if userContainer.ReadinessProbe != nil { + if userContainer.ReadinessProbe.HTTPGet != nil || userContainer.ReadinessProbe.TCPSocket != nil { + // HTTP and TCP ReadinessProbes are executed by the queue-proxy directly against the + // user-container instead of via kubelet. + userContainer.ReadinessProbe = nil + } + } + // If the client provides probes, we should fill in the port for them. - rewriteUserProbe(userContainer.ReadinessProbe, userPortInt) rewriteUserProbe(userContainer.LivenessProbe, userPortInt) podSpec := &corev1.PodSpec{ diff --git a/pkg/reconciler/revision/resources/deploy_test.go b/pkg/reconciler/revision/resources/deploy_test.go index 184068988225..2569e29e4666 100644 --- a/pkg/reconciler/revision/resources/deploy_test.go +++ b/pkg/reconciler/revision/resources/deploy_test.go @@ -69,10 +69,18 @@ var ( } defaultQueueContainer = &corev1.Container{ - Name: QueueContainerName, - Resources: createQueueResources(make(map[string]string), &corev1.Container{}), - Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + Name: QueueContainerName, + Resources: createQueueResources(make(map[string]string), &corev1.Container{}), + Ports: append(queueNonServingPorts, queueHTTPPort), + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe-period", "0"}, + }, + }, + PeriodSeconds: 1, + TimeoutSeconds: 10, + }, SecurityContext: queueSecurityContext, Env: []corev1.EnvVar{{ Name: "SERVING_NAMESPACE", @@ -251,6 +259,12 @@ func withEnvVar(name, value string) containerOption { } } +func withArgs(args []string) containerOption { + return func(container *corev1.Container) { + container.Args = append(container.Args, args...) + } +} + func withInternalVolumeMount() containerOption { return func(container *corev1.Container) { container.VolumeMounts = append(container.VolumeMounts, internalVolumeMount) @@ -394,6 +408,7 @@ func TestMakePodSpec(t *testing.T) { queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), withEnvVar("USER_PORT", "8888"), + withEnvVar("SERVING_READINESS_PROBE", ""), ), }), }, { @@ -437,6 +452,7 @@ func TestMakePodSpec(t *testing.T) { queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), withEnvVar("USER_PORT", "8888"), + withEnvVar("SERVING_READINESS_PROBE", ""), ), }, withAppendedVolumes(corev1.Volume{ Name: "asdf", @@ -458,6 +474,7 @@ func TestMakePodSpec(t *testing.T) { userContainer(), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), + withEnvVar("SERVING_READINESS_PROBE", ""), ), }), }, { @@ -481,6 +498,7 @@ func TestMakePodSpec(t *testing.T) { }), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), + withEnvVar("SERVING_READINESS_PROBE", ""), ), }), }, { @@ -499,6 +517,7 @@ func TestMakePodSpec(t *testing.T) { queueContainer( withEnvVar("SERVING_CONFIGURATION", "parent-config"), withEnvVar("CONTAINER_CONCURRENCY", "1"), + withEnvVar("SERVING_READINESS_PROBE", ""), ), }), }, { @@ -514,20 +533,41 @@ func TestMakePodSpec(t *testing.T) { cc: &deployment.Config{}, want: podSpec( []corev1.Container{ - userContainer( - withHTTPQPReadinessProbe, + userContainer(), + queueContainer( + withEnvVar("CONTAINER_CONCURRENCY", "0"), + withEnvVar("SERVING_READINESS_PROBE", `{"httpGet":{"path":"/","port":8080,"host":"127.0.0.1","scheme":"HTTP","httpHeaders":[{"name":"K-Kubelet-Probe","value":"queue"}]}}`), ), + }), + }, { + name: "with tcp readiness probe", + rev: revision(func(revision *v1alpha1.Revision) { + container(revision.Spec.GetContainer(), + withReadinessProbe(corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromInt(12345), + }, + }), + ) + }), + lc: &logging.Config{}, + oc: &metrics.ObservabilityConfig{}, + ac: &autoscaler.Config{}, + cc: &deployment.Config{}, + want: podSpec( + []corev1.Container{ + userContainer(), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "0"), + withEnvVar("SERVING_READINESS_PROBE", `{"tcpSocket":{"port":8080,"host":"127.0.0.1"}}`), ), }), }, { name: "with shell readiness probe", rev: revision(func(revision *v1alpha1.Revision) { container(revision.Spec.GetContainer(), - withExecReadinessProbe( - []string{"echo", "hello"}, - ), + withExecReadinessProbe([]string{"echo", "hello"}), ) }), lc: &logging.Config{}, @@ -537,12 +577,10 @@ func TestMakePodSpec(t *testing.T) { want: podSpec( []corev1.Container{ userContainer( - withExecReadinessProbe( - []string{"echo", "hello"}, - ), - ), + withExecReadinessProbe([]string{"echo", "hello"})), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "0"), + withEnvVar("SERVING_READINESS_PROBE", "{}"), ), }), }, { @@ -576,6 +614,7 @@ func TestMakePodSpec(t *testing.T) { ), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "0"), + withEnvVar("SERVING_READINESS_PROBE", ""), ), }), }, { @@ -602,6 +641,7 @@ func TestMakePodSpec(t *testing.T) { ), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "0"), + withEnvVar("SERVING_READINESS_PROBE", ""), ), }), }, { @@ -620,6 +660,7 @@ func TestMakePodSpec(t *testing.T) { withEnvVar("CONTAINER_CONCURRENCY", "1"), withEnvVar("ENABLE_VAR_LOG_COLLECTION", "true"), withInternalVolumeMount(), + withEnvVar("SERVING_READINESS_PROBE", ""), ), }, func(podSpec *corev1.PodSpec) { @@ -685,6 +726,7 @@ func TestMakePodSpec(t *testing.T) { ), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), + withEnvVar("SERVING_READINESS_PROBE", ""), withEnvVar("SERVING_SERVICE", ""), ), }), @@ -695,7 +737,6 @@ func TestMakePodSpec(t *testing.T) { quantityComparer := cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 }) - got := makePodSpec(test.rev, test.lc, test.oc, test.ac, test.cc) if diff := cmp.Diff(test.want, got, quantityComparer); diff != "" { t.Errorf("makePodSpec (-want, +got) = %v", diff) @@ -712,7 +753,6 @@ func TestMakePodSpec(t *testing.T) { *test.rev.Spec.DeprecatedContainer, } test.rev.Spec.DeprecatedContainer = nil - got := makePodSpec(test.rev, test.lc, test.oc, test.ac, test.cc) if diff := cmp.Diff(test.want, got, quantityComparer); diff != "" { t.Errorf("makePodSpec (-want, +got) = %v", diff) diff --git a/pkg/reconciler/revision/resources/queue.go b/pkg/reconciler/revision/resources/queue.go index 90a25c0ef65a..27dfbbae33ea 100644 --- a/pkg/reconciler/revision/resources/queue.go +++ b/pkg/reconciler/revision/resources/queue.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "knative.dev/pkg/logging" pkgmetrics "knative.dev/pkg/metrics" "knative.dev/pkg/ptr" @@ -33,9 +34,15 @@ import ( "knative.dev/serving/pkg/autoscaler" "knative.dev/serving/pkg/deployment" "knative.dev/serving/pkg/metrics" + "knative.dev/serving/pkg/network" + "knative.dev/serving/pkg/queue" + "knative.dev/serving/pkg/queue/readiness" ) -const requestQueueHTTPPortName = "queue-port" +const ( + localAddress = "127.0.0.1" + requestQueueHTTPPortName = "queue-port" +) var ( queueHTTPPort = corev1.ContainerPort{ @@ -58,23 +65,6 @@ var ( ContainerPort: int32(networking.UserQueueMetricsPort), }} - queueReadinessProbe = &corev1.Probe{ - Handler: corev1.Handler{ - Exec: &corev1.ExecAction{ - Command: []string{"/ko-app/queue", "-probe", "true"}, - }, - }, - // We want to mark the service as not ready as soon as the - // PreStop handler is called, so we need to check a little - // bit more often than the default. It is a small - // sacrifice for a low rate of 503s. - PeriodSeconds: 1, - // We keep the connection open for a while because we're - // actively probing the user-container on that endpoint and - // thus don't want to be limited by K8s granularity here. - TimeoutSeconds: 10, - } - queueSecurityContext = &corev1.SecurityContext{ AllowPrivilegeEscalation: ptr.Bool(false), } @@ -156,6 +146,51 @@ func createResourcePercentageFromAnnotations(m map[string]string, k string) (boo return true, float32(value / 100) } +func makeQueueProbe(in *corev1.Probe) *corev1.Probe { + if in == nil || in.PeriodSeconds == 0 { + out := &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe-period", "0"}, + }, + }, + // We want to mark the service as not ready as soon as the + // PreStop handler is called, so we need to check a little + // bit more often than the default. It is a small + // sacrifice for a low rate of 503s. + PeriodSeconds: 1, + // We keep the connection open for a while because we're + // actively probing the user-container on that endpoint and + // thus don't want to be limited by K8s granularity here. + TimeoutSeconds: 10, + } + + if in != nil { + out.InitialDelaySeconds = in.InitialDelaySeconds + } + return out + } + + timeout := 1 + + if in.TimeoutSeconds > 1 { + timeout = int(in.TimeoutSeconds) + } + + return &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe-period", strconv.Itoa(timeout)}, + }, + }, + PeriodSeconds: in.PeriodSeconds, + TimeoutSeconds: int32(timeout), + SuccessThreshold: in.SuccessThreshold, + FailureThreshold: in.FailureThreshold, + InitialDelaySeconds: in.InitialDelaySeconds, + } +} + // makeQueueContainer creates the container spec for the queue sidecar. func makeQueueContainer(rev *v1alpha1.Revision, loggingConfig *logging.Config, observabilityConfig *metrics.ObservabilityConfig, autoscalerConfig *autoscaler.Config, deploymentConfig *deployment.Config) *corev1.Container { @@ -191,12 +226,19 @@ func makeQueueContainer(rev *v1alpha1.Revision, loggingConfig *logging.Config, o volumeMounts = append(volumeMounts, internalVolumeMount) } + rp := rev.Spec.GetContainer().ReadinessProbe.DeepCopy() + + applyReadinessProbeDefaults(rp, userPort) + + // TODO(joshrider) bubble up error instead of squashing it here + probeJSON, _ := readiness.EncodeProbe(rp) + return &corev1.Container{ Name: QueueContainerName, Image: deploymentConfig.QueueSidecarImage, Resources: createQueueResources(rev.GetAnnotations(), rev.Spec.GetContainer()), Ports: ports, - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: makeQueueProbe(rp), VolumeMounts: volumeMounts, SecurityContext: queueSecurityContext, Env: []corev1.EnvVar{{ @@ -267,6 +309,38 @@ func makeQueueContainer(rev *v1alpha1.Revision, loggingConfig *logging.Config, o }, { Name: "INTERNAL_VOLUME_PATH", Value: internalVolumePath, + }, { + Name: "SERVING_READINESS_PROBE", + Value: probeJSON, }}, } } + +func applyReadinessProbeDefaults(p *corev1.Probe, port int32) { + switch { + case p == nil: + return + case p.HTTPGet != nil: + p.HTTPGet.Host = localAddress + p.HTTPGet.Port = intstr.FromInt(int(port)) + + if p.HTTPGet.Scheme == "" { + p.HTTPGet.Scheme = corev1.URISchemeHTTP + } + + p.HTTPGet.HTTPHeaders = append(p.HTTPGet.HTTPHeaders, corev1.HTTPHeader{ + Name: network.KubeletProbeHeaderName, + Value: queue.Name, + }) + case p.TCPSocket != nil: + p.TCPSocket.Host = localAddress + p.TCPSocket.Port = intstr.FromInt(int(port)) + case p.Exec != nil: + //User-defined ExecProbe will still be run on user-container. + p.Exec = nil + } + + if p.PeriodSeconds > 0 && p.TimeoutSeconds < 1 { + p.TimeoutSeconds = 1 + } +} diff --git a/pkg/reconciler/revision/resources/queue_test.go b/pkg/reconciler/revision/resources/queue_test.go index d8b581e40377..5e9c8aa02598 100644 --- a/pkg/reconciler/revision/resources/queue_test.go +++ b/pkg/reconciler/revision/resources/queue_test.go @@ -17,18 +17,19 @@ limitations under the License. package resources import ( + "encoding/json" "sort" "strconv" "testing" - "knative.dev/serving/pkg/resources" - "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "go.uber.org/zap/zapcore" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "knative.dev/pkg/logging" pkgmetrics "knative.dev/pkg/metrics" _ "knative.dev/pkg/metrics/testing" @@ -42,8 +43,27 @@ import ( "knative.dev/serving/pkg/autoscaler" "knative.dev/serving/pkg/deployment" "knative.dev/serving/pkg/metrics" + "knative.dev/serving/pkg/network" + "knative.dev/serving/pkg/resources" ) +var defaultKnativeQReadinessProbe = &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe-period", "0"}, + }, + }, + // We want to mark the service as not ready as soon as the + // PreStop handler is called, so we need to check a little + // bit more often than the default. It is a small + // sacrifice for a low rate of 503s. + PeriodSeconds: 1, + // We keep the connection open for a while because we're + // actively probing the user-container on that endpoint and + // thus don't want to be limited by K8s granularity here. + TimeoutSeconds: 10, +} + func TestMakeQueueContainer(t *testing.T) { tests := []struct { name string @@ -77,7 +97,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Env: env(nil), @@ -117,7 +137,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTP2Port), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Image: "alpine", @@ -155,7 +175,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Image: "alpine", @@ -193,7 +213,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Env: env(map[string]string{ @@ -232,7 +252,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Env: env(map[string]string{ @@ -267,7 +287,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Env: env(map[string]string{ @@ -298,7 +318,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Env: env(map[string]string{ @@ -332,7 +352,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Env: env(map[string]string{ @@ -353,7 +373,12 @@ func TestMakeQueueContainer(t *testing.T) { } got := makeQueueContainer(test.rev, test.lc, test.oc, test.ac, test.cc) + test.want.Env = append(test.want.Env, corev1.EnvVar{ + Name: "SERVING_READINESS_PROBE", + Value: probeJSON(test.rev.Spec.GetContainer().ReadinessProbe), + }) sortEnv(got.Env) + sortEnv(test.want.Env) if diff := cmp.Diff(test.want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { t.Errorf("makeQueueContainer (-want, +got) = %v", diff) } @@ -421,7 +446,7 @@ func TestMakeQueueContainerWithPercentageAnnotation(t *testing.T) { }, }, Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Image: "alpine", @@ -476,7 +501,7 @@ func TestMakeQueueContainerWithPercentageAnnotation(t *testing.T) { }, }, Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Image: "alpine", @@ -530,7 +555,7 @@ func TestMakeQueueContainerWithPercentageAnnotation(t *testing.T) { }, }, Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Image: "alpine", @@ -584,7 +609,7 @@ func TestMakeQueueContainerWithPercentageAnnotation(t *testing.T) { }, }, Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Image: "alpine", @@ -597,7 +622,12 @@ func TestMakeQueueContainerWithPercentageAnnotation(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { got := makeQueueContainer(test.rev, test.lc, test.oc, test.ac, test.cc) + test.want.Env = append(test.want.Env, corev1.EnvVar{ + Name: "SERVING_READINESS_PROBE", + Value: probeJSON(test.rev.Spec.GetContainer().ReadinessProbe), + }) sortEnv(got.Env) + sortEnv(test.want.Env) if diff := cmp.Diff(test.want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { t.Errorf("makeQueueContainerWithPercentageAnnotation (-want, +got) = %v", diff) } @@ -617,6 +647,362 @@ func TestMakeQueueContainerWithPercentageAnnotation(t *testing.T) { } } +func TestProbeGenerationHTTPDefaults(t *testing.T) { + rev := &v1alpha1.Revision{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo", + Name: "bar", + UID: "1234", + }, + Spec: v1alpha1.RevisionSpec{ + RevisionSpec: v1beta1.RevisionSpec{ + ContainerConcurrency: 1, + TimeoutSeconds: ptr.Int64(45), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: containerName, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + }, + }, + PeriodSeconds: 1, + TimeoutSeconds: 10, + }, + }}, + }, + }, + }, + } + + expectedProbe := &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Host: "127.0.0.1", + Path: "/", + Port: intstr.FromInt(int(v1alpha1.DefaultUserPort)), + Scheme: corev1.URISchemeHTTP, + HTTPHeaders: []corev1.HTTPHeader{{ + Name: network.KubeletProbeHeaderName, + Value: "queue", + }}, + }, + }, + PeriodSeconds: 1, + TimeoutSeconds: 10, + } + + lc := &logging.Config{} + oc := &metrics.ObservabilityConfig{} + ac := &autoscaler.Config{} + cc := &deployment.Config{} + want := &corev1.Container{ + // These are effectively constant + Name: QueueContainerName, + Resources: createQueueResources(make(map[string]string), &corev1.Container{}), + Ports: append(queueNonServingPorts, queueHTTPPort), + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe-period", "10"}, + }, + }, + PeriodSeconds: 1, + TimeoutSeconds: 10, + }, + // These changed based on the Revision and configs passed in. + Env: env(map[string]string{ + "SERVING_READINESS_PROBE": probeJSON(expectedProbe), + }), + SecurityContext: queueSecurityContext, + } + + got := makeQueueContainer(rev, lc, oc, ac, cc) + sortEnv(got.Env) + if diff := cmp.Diff(want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { + t.Errorf("makeQueueContainer(-want, +got) = %v", diff) + } +} + +func TestProbeGenerationHTTP(t *testing.T) { + userPort := 12345 + probePath := "/health" + + rev := &v1alpha1.Revision{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo", + Name: "bar", + UID: "1234", + }, + Spec: v1alpha1.RevisionSpec{ + RevisionSpec: v1beta1.RevisionSpec{ + ContainerConcurrency: 1, + TimeoutSeconds: ptr.Int64(45), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: containerName, + Ports: []v1.ContainerPort{{ + ContainerPort: int32(userPort), + }}, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: probePath, + Scheme: corev1.URISchemeHTTPS, + }, + }, + PeriodSeconds: 2, + TimeoutSeconds: 10, + }, + }}, + }, + }, + }, + } + + expectedProbe := &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Host: "127.0.0.1", + Path: probePath, + Port: intstr.FromInt(userPort), + Scheme: corev1.URISchemeHTTPS, + HTTPHeaders: []corev1.HTTPHeader{{ + Name: network.KubeletProbeHeaderName, + Value: "queue", + }}, + }, + }, + PeriodSeconds: 2, + TimeoutSeconds: 10, + } + + lc := &logging.Config{} + oc := &metrics.ObservabilityConfig{} + ac := &autoscaler.Config{} + cc := &deployment.Config{} + want := &corev1.Container{ + // These are effectively constant + Name: QueueContainerName, + Resources: createQueueResources(make(map[string]string), &corev1.Container{}), + Ports: append(queueNonServingPorts, queueHTTPPort), + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe-period", "10"}, + }, + }, + PeriodSeconds: 2, + TimeoutSeconds: 10, + }, + // These changed based on the Revision and configs passed in. + Env: env(map[string]string{"USER_PORT": strconv.Itoa(userPort), "SERVING_READINESS_PROBE": probeJSON(expectedProbe)}), + SecurityContext: queueSecurityContext, + } + + got := makeQueueContainer(rev, lc, oc, ac, cc) + sortEnv(got.Env) + if diff := cmp.Diff(want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { + t.Errorf("makeQueueContainer(-want, +got) = %v", diff) + } +} + +func TestTCPProbeGeneration(t *testing.T) { + userPort := 12345 + tests := []struct { + name string + rev v1alpha1.RevisionSpec + want *corev1.Container + wantProbe *corev1.Probe + }{{ + name: "knative tcp probe", + wantProbe: &corev1.Probe{ + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromInt(userPort), + }, + }, + PeriodSeconds: 0, + SuccessThreshold: 3, + }, + rev: v1alpha1.RevisionSpec{ + RevisionSpec: v1beta1.RevisionSpec{ + ContainerConcurrency: 1, + TimeoutSeconds: ptr.Int64(45), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: containerName, + Ports: []v1.ContainerPort{{ + ContainerPort: int32(userPort), + }}, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{}, + }, + PeriodSeconds: 0, + SuccessThreshold: 3, + }, + }}, + }, + }, + }, + want: &corev1.Container{ + // These are effectively constant + Name: QueueContainerName, + Resources: createQueueResources(make(map[string]string), &corev1.Container{}), + Ports: append(queueNonServingPorts, queueHTTPPort), + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe-period", "0"}, + }, + }, + PeriodSeconds: 1, + TimeoutSeconds: 10, + }, + // These changed based on the Revision and configs passed in. + Env: env(map[string]string{"USER_PORT": strconv.Itoa(userPort)}), + SecurityContext: queueSecurityContext, + }, + }, { + name: "tcp defaults", + rev: v1alpha1.RevisionSpec{ + RevisionSpec: v1beta1.RevisionSpec{ + ContainerConcurrency: 1, + TimeoutSeconds: ptr.Int64(45), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: containerName, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{}, + }, + PeriodSeconds: 1, + }, + }}, + }, + }, + }, + wantProbe: &corev1.Probe{ + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromInt(int(v1alpha1.DefaultUserPort)), + }, + }, + PeriodSeconds: 1, + TimeoutSeconds: 1, + }, + want: &corev1.Container{ + // These are effectively constant + Name: QueueContainerName, + Resources: createQueueResources(make(map[string]string), &corev1.Container{}), + Ports: append(queueNonServingPorts, queueHTTPPort), + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe-period", "1"}, + }, + }, + PeriodSeconds: 1, + TimeoutSeconds: 1, + }, + // These changed based on the Revision and configs passed in. + Env: env(map[string]string{}), + SecurityContext: queueSecurityContext, + }, + }, { + name: "user defined tcp probe", + wantProbe: &corev1.Probe{ + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromInt(userPort), + }, + }, + PeriodSeconds: 2, + TimeoutSeconds: 15, + SuccessThreshold: 2, + FailureThreshold: 7, + InitialDelaySeconds: 3, + }, + rev: v1alpha1.RevisionSpec{ + RevisionSpec: v1beta1.RevisionSpec{ + ContainerConcurrency: 1, + TimeoutSeconds: ptr.Int64(45), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: containerName, + Ports: []v1.ContainerPort{{ + ContainerPort: int32(userPort), + }}, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{}, + }, + PeriodSeconds: 2, + TimeoutSeconds: 15, + SuccessThreshold: 2, + FailureThreshold: 7, + InitialDelaySeconds: 3, + }, + }}, + }, + }, + }, + want: &corev1.Container{ + // These are effectively constant + Name: QueueContainerName, + Resources: createQueueResources(make(map[string]string), &corev1.Container{}), + Ports: append(queueNonServingPorts, queueHTTPPort), + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe-period", "15"}, + }, + }, + PeriodSeconds: 2, + TimeoutSeconds: 15, + SuccessThreshold: 2, + FailureThreshold: 7, + InitialDelaySeconds: 3, + }, + // These changed based on the Revision and configs passed in. + Env: env(map[string]string{"USER_PORT": strconv.Itoa(userPort)}), + SecurityContext: queueSecurityContext, + }, + }} + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + lc := &logging.Config{} + oc := &metrics.ObservabilityConfig{} + ac := &autoscaler.Config{} + cc := &deployment.Config{} + testRev := &v1alpha1.Revision{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo", + Name: "bar", + UID: "1234", + }, + Spec: test.rev, + } + test.want.Env = append(test.want.Env, corev1.EnvVar{ + Name: "SERVING_READINESS_PROBE", + Value: probeJSON(test.wantProbe), + }) + + got := makeQueueContainer(testRev, lc, oc, ac, cc) + sortEnv(got.Env) + sortEnv(test.want.Env) + if diff := cmp.Diff(test.want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { + t.Errorf("makeQueueContainer (-want, +got) = %v", diff) + } + }) + } +} + var defaultEnv = map[string]string{ "SERVING_NAMESPACE": "foo", "SERVING_SERVICE": "", @@ -638,6 +1024,15 @@ var defaultEnv = map[string]string{ "INTERNAL_VOLUME_PATH": internalVolumePath, } +func probeJSON(probe *corev1.Probe) string { + if probe != nil { + if probeBytes, err := json.Marshal(probe); err == nil { + return string(probeBytes) + } + } + return "" +} + func env(overrides map[string]string) []corev1.EnvVar { values := resources.UnionMaps(defaultEnv, overrides)