From 5f8ac7587403e7b76a82a80edd31c4ea49c93245 Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Fri, 31 May 2019 10:39:15 -0400 Subject: [PATCH 1/9] use user-defined readinessprobe in queue-proxy Signed-off-by: Shash Reddy Co-authored-by: Shash Reddy --- cmd/queue/main.go | 124 ++--- cmd/queue/main_test.go | 137 ++++-- pkg/apis/serving/k8s_validation.go | 43 +- pkg/apis/serving/k8s_validation_test.go | 100 ++++ .../v1alpha1/configuration_defaults_test.go | 25 +- .../v1alpha1/revision_defaults_test.go | 55 ++- .../serving/v1alpha1/service_defaults_test.go | 85 ++-- .../v1beta1/configuration_defaults_test.go | 19 +- pkg/apis/serving/v1beta1/revision_defaults.go | 10 + .../serving/v1beta1/revision_defaults_test.go | 75 ++- .../serving/v1beta1/service_defaults_test.go | 26 +- pkg/reconciler/revision/resources/deploy.go | 9 +- .../revision/resources/deploy_test.go | 66 ++- pkg/reconciler/revision/resources/queue.go | 117 ++++- .../revision/resources/queue_test.go | 427 +++++++++++++++++- 15 files changed, 1079 insertions(+), 239 deletions(-) diff --git a/cmd/queue/main.go b/cmd/queue/main.go index a63170b18de3..51f8dcef59d3 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -31,6 +31,12 @@ import ( "github.com/kelseyhightower/envconfig" "github.com/pkg/errors" + "go.opencensus.io/stats" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/wait" + "knative.dev/pkg/logging/logkey" + "knative.dev/pkg/metrics" + "knative.dev/pkg/signals" "knative.dev/serving/pkg/activator" activatorutil "knative.dev/serving/pkg/activator/util" "knative.dev/serving/pkg/apis/networking" @@ -40,16 +46,8 @@ import ( "knative.dev/serving/pkg/network" "knative.dev/serving/pkg/queue" "knative.dev/serving/pkg/queue/health" + "knative.dev/serving/pkg/queue/readiness" queuestats "knative.dev/serving/pkg/queue/stats" - - "go.opencensus.io/stats" - "go.uber.org/zap" - - "knative.dev/pkg/logging/logkey" - "knative.dev/pkg/metrics" - "knative.dev/pkg/signals" - - "k8s.io/apimachinery/pkg/util/wait" ) const ( @@ -67,10 +65,6 @@ const ( // in the mesh. quitSleepDuration = 20 * time.Second - // Set equal to the queue-proxy's ExecProbe timeout to take - // advantage of the full window - probeTimeout = 10 * time.Second - badProbeTemplate = "unexpected probe header value: %s" // Metrics' names (without component prefix). @@ -84,6 +78,11 @@ const ( requestQueueHealthPath = "/health" healthURLTemplate = "http://127.0.0.1:%d" + requestQueueHealthPath + tcpProbeTimeout = 100 * time.Millisecond + // The 25 millisecond retry interval is an unscientific compromise between wanting to get + // started as early as possible while still wanting to give the container some breathing + // room to get up and running. + aggressivePollInterval = 25 * time.Millisecond ) var ( @@ -98,8 +97,6 @@ var ( healthState = &health.State{} promStatReporter *queue.PrometheusStatsReporter // Prometheus stats reporter. - probe = flag.Bool("probe", false, "run readiness probe") - // Metric counters. requestCountM = stats.Int64( requestCountN, @@ -117,6 +114,8 @@ var ( appResponseTimeInMsecN, "The response time in millisecond", stats.UnitMilliseconds) + readinessProbeTimeout = flag.Int("probe", -1, "run readiness probe with given timeout") + ucProbe = flag.String("readiness-probe", "", "JSON readiness probe configuration for user container") ) type config struct { @@ -174,29 +173,8 @@ func knativeProxyHeader(r *http.Request) string { return r.Header.Get(network.ProxyHeaderName) } -func probeUserContainer() bool { - var err error - wait.PollImmediate(50*time.Millisecond, probeTimeout, func() (bool, error) { - logger.Debug("TCP probing the user-container.") - config := health.TCPProbeConfigOptions{ - Address: userTargetAddress, - SocketTimeout: 100 * time.Millisecond, - } - err = health.TCPProbe(config) - return err == nil, nil - }) - - if err == nil { - logger.Info("User-container successfully probed.") - } else { - logger.Errorw("User-container could not be probed successfully.", zap.Error(err)) - } - - return err == nil -} - // Make handler a closure for testing. -func handler(reqChan chan queue.ReqEvent, breaker *queue.Breaker, handler http.Handler) func(http.ResponseWriter, *http.Request) { +func handler(reqChan chan queue.ReqEvent, breaker *queue.Breaker, handler http.Handler, prober func() bool) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { ph := knativeProbeHeader(r) switch { @@ -205,12 +183,17 @@ func handler(reqChan chan queue.ReqEvent, breaker *queue.Breaker, handler http.H http.Error(w, fmt.Sprintf(badProbeTemplate, ph), http.StatusBadRequest) return } - if probeUserContainer() { - // Respond with the name of the component handling the request. - w.Write([]byte(queue.Name)) + if prober != nil { + if prober() { + // Respond with the name of the component handling the request. + w.Write([]byte(queue.Name)) + } else { + http.Error(w, "container not ready", http.StatusServiceUnavailable) + } } else { - http.Error(w, "container not ready", http.StatusServiceUnavailable) + http.Error(w, "no probe", http.StatusInternalServerError) } + return case network.IsKubeletProbe(r): // Do not count health checks for concurrency metrics @@ -243,39 +226,61 @@ func handler(reqChan chan queue.ReqEvent, breaker *queue.Breaker, handler http.H } // Sets up /health and /wait-for-drain endpoints. -func createAdminHandlers() *http.ServeMux { +func createAdminHandlers(p *readiness.Probe) *http.ServeMux { mux := http.NewServeMux() - // TODO(@joshrider): temporary change while waiting on other PRs to merge (See #4014) - mux.HandleFunc(requestQueueHealthPath, healthState.HealthHandler(probeUserContainer, true /*isNotAggressive*/)) + + mux.HandleFunc(requestQueueHealthPath, healthState.HealthHandler(p.ProbeContainer, p.IsAggressive())) mux.HandleFunc(queue.RequestQueueDrainPath, healthState.DrainHandler()) return mux } -func probeQueueHealthPath(port int, timeout time.Duration) error { +func probeQueueHealthPath(port int, timeoutSeconds int) error { url := fmt.Sprintf(healthURLTemplate, port) + // Use aggressive sub-second retries. + if timeoutSeconds == 0 { + return knativeProbe(url) + } + + httpClient := &http.Client{ + Transport: &http.Transport{ + // Do not use the cached connection + DisableKeepAlives: true, + }, + Timeout: time.Duration(timeoutSeconds) * time.Second, + } + + res, err := httpClient.Get(url) + + if err != nil { + return errors.Wrap(err, "failed to probe") + } + if !health.IsHTTPProbeReady(res) { + return errors.New("probe returned not ready") + } + + return nil +} + +func knativeProbe(url string) error { httpClient := &http.Client{ Transport: &http.Transport{ // Do not use the cached connection DisableKeepAlives: true, }, - Timeout: timeout, + Timeout: readiness.PollTimeout, } var lastErr error - // The 25 millisecond retry interval is an unscientific compromise between wanting to get - // started as early as possible while still wanting to give the container some breathing - // room to get up and running. - timeoutErr := wait.PollImmediate(25*time.Millisecond, timeout, func() (bool, error) { + timeoutErr := wait.PollImmediate(aggressivePollInterval, readiness.PollTimeout, func() (bool, error) { var res *http.Response if res, lastErr = httpClient.Get(url); res == nil { return false, nil } defer res.Body.Close() - - return res.StatusCode == http.StatusOK, nil + return health.IsHTTPProbeReady(res), nil }) if lastErr != nil { @@ -293,8 +298,8 @@ func probeQueueHealthPath(port int, timeout time.Duration) error { func main() { flag.Parse() - if *probe { - if err := probeQueueHealthPath(networking.QueueAdminPort, probeTimeout); err != nil { + if *readinessProbeTimeout >= 0 { + if err := probeQueueHealthPath(networking.QueueAdminPort, *readinessProbeTimeout); err != nil { // used instead of the logger to produce a concise event message fmt.Fprintln(os.Stderr, err) os.Exit(1) @@ -356,9 +361,16 @@ func main() { StatChan: statChan, }, time.Now()) + coreProbe, err := readiness.DecodeProbe(*ucProbe) + if err != nil { + logger.Fatalw("Queue container failed to parse readiness probe", zap.Error(err)) + } + + rp := readiness.NewProbe(coreProbe, logger.With(zap.String(logkey.Key, "readinessProbe"))) + adminServer := &http.Server{ Addr: ":" + strconv.Itoa(networking.QueueAdminPort), - Handler: createAdminHandlers(), + Handler: createAdminHandlers(rp), } metricsSupported := false @@ -379,7 +391,7 @@ func main() { if metricsSupported { composedHandler = pushRequestMetricHandler(httpProxy, appRequestCountM, appResponseTimeInMsecM, env) } - composedHandler = http.HandlerFunc(handler(reqChan, breaker, composedHandler)) + composedHandler = http.HandlerFunc(handler(reqChan, breaker, composedHandler, rp.ProbeContainer)) composedHandler = queue.ForwardedShimHandler(composedHandler) composedHandler = queue.TimeToFirstByteTimeoutHandler(composedHandler, time.Duration(env.RevisionTimeoutSeconds)*time.Second, "request timeout") diff --git a/cmd/queue/main_test.go b/cmd/queue/main_test.go index 6db5344824d2..b9ae21df7372 100644 --- a/cmd/queue/main_test.go +++ b/cmd/queue/main_test.go @@ -30,8 +30,6 @@ import ( "time" "github.com/google/go-cmp/cmp" - - logtesting "knative.dev/pkg/logging/testing" "knative.dev/serving/pkg/activator" "knative.dev/serving/pkg/network" "knative.dev/serving/pkg/queue" @@ -70,7 +68,7 @@ func TestHandlerReqEvent(t *testing.T) { params := queue.BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10} breaker := queue.NewBreaker(params) reqChan := make(chan queue.ReqEvent, 10) - h := handler(reqChan, breaker, proxy) + h := handler(reqChan, breaker, proxy, func() bool { return true }) writer := httptest.NewRecorder() req := httptest.NewRequest(http.MethodPost, "http://example.com", nil) @@ -91,48 +89,56 @@ func TestHandlerReqEvent(t *testing.T) { } } -func TestProberHandler(t *testing.T) { - defer logtesting.ClearAll() - logger = logtesting.TestLogger(t) - - // All arguments are needed only for serving. - h := handler(nil, nil, nil) - - writer := httptest.NewRecorder() - req := httptest.NewRequest(http.MethodPost, "http://example.com", nil) - - req.Header.Set(network.ProbeHeaderName, "test-probe") - req.Header.Set(network.ProxyHeaderName, activator.Name) - - h(writer, req) - - // Should get 400. - if got, want := writer.Code, http.StatusBadRequest; got != want { - t.Errorf("Bad probe status = %v, want: %v", got, want) - } - if got, want := strings.TrimSpace(writer.Body.String()), fmt.Sprintf(badProbeTemplate, "test-probe"); got != want { - // \r\n might be inserted, etc. - t.Errorf("Bad probe body = %q, want: %q, diff: %s", got, want, cmp.Diff(got, want)) - } - - // Fix up the header. - writer = httptest.NewRecorder() - req.Header.Set(network.ProbeHeaderName, queue.Name) - - server := httptest.NewServer(http.HandlerFunc(h)) - defer server.Close() - userTargetAddress = strings.TrimPrefix(server.URL, "http://") - h(writer, req) - - // Should get 200. - if got, want := writer.Code, http.StatusOK; got != want { - t.Errorf("Good probe status = %v, want: %v", got, want) - } - - // Body should be the `queue`. - if got, want := strings.TrimSpace(writer.Body.String()), queue.Name; got != want { - // \r\n might be inserted, etc. - t.Errorf("Good probe body = %q, want: %q, diff: %s", got, want, cmp.Diff(got, want)) +func TestProbeHandler(t *testing.T) { + testcases := []struct { + name string + prober func() bool + wantCode int + wantBody string + requestHeader string + }{{ + name: "unexpected probe header", + prober: func() bool { return true }, + wantCode: http.StatusBadRequest, + wantBody: fmt.Sprintf(badProbeTemplate, "test-probe"), + requestHeader: "test-probe", + }, { + name: "true probe function", + prober: func() bool { return true }, + wantCode: http.StatusOK, + wantBody: queue.Name, + requestHeader: queue.Name, + }, { + name: "nil probe function", + prober: nil, + wantCode: http.StatusInternalServerError, + wantBody: "no probe", + requestHeader: queue.Name, + }, { + name: "false probe function", + prober: func() bool { return false }, + wantCode: http.StatusServiceUnavailable, + wantBody: "container not ready", + requestHeader: queue.Name, + }} + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + writer := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "http://example.com", nil) + req.Header.Set(network.ProbeHeaderName, tc.requestHeader) + + h := handler(nil, nil, nil, tc.prober) + h(writer, req) + + if got, want := writer.Code, tc.wantCode; got != want { + t.Errorf("probe status = %v, want: %v", got, want) + } + if got, want := strings.TrimSpace(writer.Body.String()), tc.wantBody; got != want { + // \r\n might be inserted, etc. + t.Errorf("probe body = %q, want: %q, diff: %s", got, want, cmp.Diff(got, want)) + } + }) } } @@ -164,7 +170,8 @@ func TestCreateVarLogLink(t *testing.T) { func TestProbeQueueConnectionFailure(t *testing.T) { port := 12345 // some random port (that's not listening) - if err := probeQueueHealthPath(port, time.Second); err == nil { + + if err := probeQueueHealthPath(port, 1); err == nil { t.Error("Expected error, got nil") } } @@ -188,7 +195,7 @@ func TestProbeQueueNotReady(t *testing.T) { t.Fatalf("Failed to convert port(%s) to int: %v", u.Port(), err) } - err = probeQueueHealthPath(port, 1*time.Second) + err = probeQueueHealthPath(port, 1) if diff := cmp.Diff(err.Error(), "probe returned not ready"); diff != "" { t.Errorf("Unexpected not ready message: %s", diff) @@ -218,8 +225,8 @@ func TestProbeQueueReady(t *testing.T) { t.Fatalf("Failed to convert port(%s) to int: %v", u.Port(), err) } - if err = probeQueueHealthPath(port, 1*time.Second); err != nil { - t.Errorf("probeQueueHealthPath(%d) = %s", port, err) + if err = probeQueueHealthPath(port, 1); err != nil { + t.Errorf("probeQueueHealthPath(%d, 1s) = %s", port, err) } if !queueProbed { @@ -227,6 +234,35 @@ func TestProbeQueueReady(t *testing.T) { } } +func TestProbeQueueTimeout(t *testing.T) { + queueProbed := false + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + queueProbed = true + time.Sleep(2 * time.Second) + w.WriteHeader(http.StatusOK) + })) + + defer ts.Close() + + portStr := strings.TrimPrefix(ts.URL, "http://127.0.0.1:") + + port, err := strconv.Atoi(portStr) + if err != nil { + t.Fatalf("failed to convert port(%s) to int", portStr) + } + + timeout := 1 + if err = probeQueueHealthPath(port, timeout); err == nil { + t.Errorf("Expected probeQueueHealthPath(%d, %v) to return timeout error", port, timeout) + } + + ts.Close() + + if !queueProbed { + t.Errorf("Expected the queue proxy server to be probed") + } +} + func TestProbeQueueDelayedReady(t *testing.T) { count := 0 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -250,7 +286,8 @@ func TestProbeQueueDelayedReady(t *testing.T) { t.Fatalf("Failed to convert port(%s) to int: %v", u.Port(), err) } - if err := probeQueueHealthPath(port, time.Second); err != nil { + timeout := 0 + if err := probeQueueHealthPath(port, timeout); err != nil { t.Errorf("probeQueueHealthPath(%d) = %s", port, err) } } diff --git a/pkg/apis/serving/k8s_validation.go b/pkg/apis/serving/k8s_validation.go index cc189a49e05b..f9e0682b56b1 100644 --- a/pkg/apis/serving/k8s_validation.go +++ b/pkg/apis/serving/k8s_validation.go @@ -283,7 +283,7 @@ func ValidateContainer(container corev1.Container, volumes sets.String) *apis.Fi // Ports errs = errs.Also(validateContainerPorts(container.Ports).ViaField("ports")) // Readiness Probes - errs = errs.Also(validateProbe(container.ReadinessProbe).ViaField("readinessProbe")) + errs = errs.Also(validateReadinessProbe(container.ReadinessProbe).ViaField("readinessProbe")) // Resources errs = errs.Also(validateResources(&container.Resources).ViaField("resources")) // SecurityContext @@ -421,6 +421,43 @@ func validateContainerPorts(ports []corev1.ContainerPort) *apis.FieldError { return errs } +func validateReadinessProbe(p *corev1.Probe) *apis.FieldError { + if p == nil { + return nil + } + + errs := validateProbe(p) + + if p.PeriodSeconds < 0 { + errs = errs.Also(apis.ErrOutOfBoundsValue(p.PeriodSeconds, 0, math.MaxInt32, "periodSeconds")) + } + + if p.SuccessThreshold < 1 { + errs = errs.Also(apis.ErrOutOfBoundsValue(p.SuccessThreshold, 1, math.MaxInt32, "successThreshold")) + } + + // PeriodSeconds == 0 indicates Knative's special probe with aggressive retries + if p.PeriodSeconds == 0 { + if p.FailureThreshold != 0 { + errs = errs.Also(apis.ErrDisallowedFields("failureThreshold")) + } + + if p.TimeoutSeconds != 0 { + errs = errs.Also(apis.ErrDisallowedFields("timeoutSeconds")) + } + } else { + if p.TimeoutSeconds < 1 { + errs = errs.Also(apis.ErrOutOfBoundsValue(p.TimeoutSeconds, 1, math.MaxInt32, "timeoutSeconds")) + } + + if p.FailureThreshold < 1 { + errs = errs.Also(apis.ErrOutOfBoundsValue(p.FailureThreshold, 1, math.MaxInt32, "failureThreshold")) + } + } + + return errs +} + func validateProbe(p *corev1.Probe) *apis.FieldError { if p == nil { return nil @@ -435,6 +472,10 @@ func validateProbe(p *corev1.Probe) *apis.FieldError { errs = errs.Also(apis.CheckDisallowedFields(*h.HTTPGet, *HTTPGetActionMask(h.HTTPGet))).ViaField("httpGet") case h.TCPSocket != nil: errs = errs.Also(apis.CheckDisallowedFields(*h.TCPSocket, *TCPSocketActionMask(h.TCPSocket))).ViaField("tcpSocket") + case h.Exec != nil: + errs = errs.Also(apis.CheckDisallowedFields(*h.Exec, *ExecActionMask(h.Exec))).ViaField("exec") + default: + errs = errs.Also(apis.ErrMissingField()) } return errs } diff --git a/pkg/apis/serving/k8s_validation_test.go b/pkg/apis/serving/k8s_validation_test.go index 26b24aac4a1f..a4bd9b2e1598 100644 --- a/pkg/apis/serving/k8s_validation_test.go +++ b/pkg/apis/serving/k8s_validation_test.go @@ -499,6 +499,10 @@ func TestContainerValidation(t *testing.T) { c: corev1.Container{ Image: "foo", ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, Handler: corev1.Handler{ HTTPGet: &corev1.HTTPGetAction{ Path: "/", @@ -512,11 +516,57 @@ func TestContainerValidation(t *testing.T) { }, }, want: nil, + }, { + name: "valid with exec probes ", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + }, + }, + }, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{}, + }, + }, + }, + want: nil, + }, { + name: "invalid with no probes ", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + }, + }, + }, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{}, + }, + }, + want: apis.ErrMissingField(""), }, { name: "invalid readiness http probe (has port)", c: corev1.Container{ Image: "foo", ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, Handler: corev1.Handler{ HTTPGet: &corev1.HTTPGetAction{ Path: "/", @@ -526,6 +576,56 @@ func TestContainerValidation(t *testing.T) { }, }, want: apis.ErrDisallowedFields("readinessProbe.httpGet.port"), + }, { + name: "invalid readiness probe (has failureThreshold while using special probe)", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 0, + FailureThreshold: 2, + SuccessThreshold: 1, + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + }, + }, + }, + }, + want: apis.ErrDisallowedFields("readinessProbe.failureThreshold"), + }, { + name: "invalid readiness probe (has timeoutSeconds while using special probe)", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 0, + TimeoutSeconds: 2, + SuccessThreshold: 1, + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + }, + }, + }, + }, + want: apis.ErrDisallowedFields("readinessProbe.timeoutSeconds"), + }, { + name: "out of bounds probe values", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: -1, + TimeoutSeconds: 0, + SuccessThreshold: 0, + FailureThreshold: 0, + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{}, + }, + }, + }, + want: apis.ErrOutOfBoundsValue(-1, 0, math.MaxInt32, "readinessProbe.periodSeconds").Also( + apis.ErrOutOfBoundsValue(0, 1, math.MaxInt32, "readinessProbe.timeoutSeconds")).Also( + apis.ErrOutOfBoundsValue(0, 1, math.MaxInt32, "readinessProbe.successThreshold")).Also( + apis.ErrOutOfBoundsValue(0, 1, math.MaxInt32, "readinessProbe.failureThreshold")), }, { name: "disallowed security context field", c: corev1.Container{ diff --git a/pkg/apis/serving/v1alpha1/configuration_defaults_test.go b/pkg/apis/serving/v1alpha1/configuration_defaults_test.go index c0be3f5e3f3d..fd554331cde3 100644 --- a/pkg/apis/serving/v1alpha1/configuration_defaults_test.go +++ b/pkg/apis/serving/v1alpha1/configuration_defaults_test.go @@ -67,8 +67,9 @@ func TestConfigurationDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -96,9 +97,10 @@ func TestConfigurationDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -129,8 +131,9 @@ func TestConfigurationDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -149,7 +152,8 @@ func TestConfigurationDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(99), }, DeprecatedContainer: &corev1.Container{ - Resources: defaultResources, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -164,8 +168,9 @@ func TestConfigurationDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(99), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, diff --git a/pkg/apis/serving/v1alpha1/revision_defaults_test.go b/pkg/apis/serving/v1alpha1/revision_defaults_test.go index 5ccf9f7b6347..c2572a2ab6ac 100644 --- a/pkg/apis/serving/v1alpha1/revision_defaults_test.go +++ b/pkg/apis/serving/v1alpha1/revision_defaults_test.go @@ -30,6 +30,13 @@ import ( "knative.dev/serving/pkg/apis/serving/v1beta1" ) +var defaultProbe = &corev1.Probe{ + SuccessThreshold: 1, + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{}, + }, +} + func TestRevisionDefaulting(t *testing.T) { defer logtesting.ClearAll() tests := []struct { @@ -47,8 +54,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -66,8 +74,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -97,8 +106,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(123), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -127,7 +137,8 @@ func TestRevisionDefaulting(t *testing.T) { Name: "bar", ReadOnly: true, }}, - Resources: defaultResources, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, RevisionSpec: v1beta1.RevisionSpec{ ContainerConcurrency: 1, @@ -163,7 +174,8 @@ func TestRevisionDefaulting(t *testing.T) { Name: "bar", ReadOnly: true, }}, - Resources: defaultResources, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, ContainerConcurrency: 1, @@ -184,8 +196,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(99), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Image: "foo", - Resources: defaultResources, + Image: "foo", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -201,9 +214,10 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(99), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "foo", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "foo", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -227,8 +241,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(99), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -249,8 +264,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -273,8 +289,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, diff --git a/pkg/apis/serving/v1alpha1/service_defaults_test.go b/pkg/apis/serving/v1alpha1/service_defaults_test.go index fae03a6b179f..b24047f7ff0f 100644 --- a/pkg/apis/serving/v1alpha1/service_defaults_test.go +++ b/pkg/apis/serving/v1alpha1/service_defaults_test.go @@ -88,8 +88,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -124,9 +125,10 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -173,8 +175,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -207,8 +210,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -244,9 +248,10 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -294,8 +299,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(99), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -332,8 +338,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -368,8 +375,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -428,8 +436,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -486,8 +495,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -542,8 +552,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(99), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -580,9 +591,10 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "blah", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "blah", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -618,8 +630,9 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), }, DeprecatedContainer: &corev1.Container{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }, }, }, @@ -666,9 +679,10 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "blah", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "blah", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -720,9 +734,10 @@ func TestServiceDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "blah", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "blah", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, diff --git a/pkg/apis/serving/v1beta1/configuration_defaults_test.go b/pkg/apis/serving/v1beta1/configuration_defaults_test.go index 50b253942afe..70e63fa2d3a5 100644 --- a/pkg/apis/serving/v1beta1/configuration_defaults_test.go +++ b/pkg/apis/serving/v1beta1/configuration_defaults_test.go @@ -41,8 +41,9 @@ func TestConfigurationDefaulting(t *testing.T) { Spec: RevisionSpec{ PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), @@ -71,9 +72,10 @@ func TestConfigurationDefaulting(t *testing.T) { Spec: RevisionSpec{ PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), @@ -103,9 +105,10 @@ func TestConfigurationDefaulting(t *testing.T) { Spec: RevisionSpec{ PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, TimeoutSeconds: ptr.Int64(60), diff --git a/pkg/apis/serving/v1beta1/revision_defaults.go b/pkg/apis/serving/v1beta1/revision_defaults.go index 9d66497480f6..dc212293b1d7 100644 --- a/pkg/apis/serving/v1beta1/revision_defaults.go +++ b/pkg/apis/serving/v1beta1/revision_defaults.go @@ -83,6 +83,16 @@ func (rs *RevisionSpec) SetDefaults(ctx context.Context) { container.Resources.Limits[corev1.ResourceMemory] = *rsrc } } + if container.ReadinessProbe == nil { + container.ReadinessProbe = &corev1.Probe{} + } + if container.ReadinessProbe.TCPSocket == nil && container.ReadinessProbe.HTTPGet == nil && container.ReadinessProbe.Exec == nil { + container.ReadinessProbe.TCPSocket = &corev1.TCPSocketAction{} + } + + if container.ReadinessProbe.SuccessThreshold == 0 { + container.ReadinessProbe.SuccessThreshold = 1 + } vms := container.VolumeMounts for i := range vms { diff --git a/pkg/apis/serving/v1beta1/revision_defaults_test.go b/pkg/apis/serving/v1beta1/revision_defaults_test.go index 5e05e5f0f9c0..a5bfe188c83b 100644 --- a/pkg/apis/serving/v1beta1/revision_defaults_test.go +++ b/pkg/apis/serving/v1beta1/revision_defaults_test.go @@ -36,6 +36,12 @@ var ( Requests: corev1.ResourceList{}, Limits: corev1.ResourceList{}, } + defaultProbe = &corev1.Probe{ + SuccessThreshold: 1, + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{}, + }, + } ignoreUnexportedResources = cmpopts.IgnoreUnexported(resource.Quantity{}) ) @@ -54,8 +60,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -82,8 +89,9 @@ func TestRevisionDefaulting(t *testing.T) { TimeoutSeconds: ptr.Int64(123), PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, @@ -114,7 +122,8 @@ func TestRevisionDefaulting(t *testing.T) { Name: "bar", ReadOnly: true, }}, - Resources: defaultResources, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, ContainerConcurrency: 1, @@ -130,6 +139,13 @@ func TestRevisionDefaulting(t *testing.T) { PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ Name: "foo", + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.2", + }, + }, + }, }}, }, }, @@ -142,14 +158,34 @@ func TestRevisionDefaulting(t *testing.T) { Containers: []corev1.Container{{ Name: "foo", Resources: defaultResources, + ReadinessProbe: &corev1.Probe{ + SuccessThreshold: 1, + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.2", + }, + }, + }, }}, }, }, }, }, { - name: "partially initialized", + name: "no overwrite exec", in: &Revision{ - Spec: RevisionSpec{}, + Spec: RevisionSpec{ + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"echo", "hi"}, + }, + }, + }, + }}, + }, + }, }, want: &Revision{ Spec: RevisionSpec{ @@ -158,6 +194,31 @@ func TestRevisionDefaulting(t *testing.T) { Containers: []corev1.Container{{ Name: config.DefaultUserContainerName, Resources: defaultResources, + ReadinessProbe: &corev1.Probe{ + SuccessThreshold: 1, + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"echo", "hi"}, + }, + }, + }, + }}, + }, + }, + }, + }, { + name: "partially initialized", + in: &Revision{ + Spec: RevisionSpec{}, + }, + want: &Revision{ + Spec: RevisionSpec{ + TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, }, diff --git a/pkg/apis/serving/v1beta1/service_defaults_test.go b/pkg/apis/serving/v1beta1/service_defaults_test.go index 73125af8d746..32c73ff16008 100644 --- a/pkg/apis/serving/v1beta1/service_defaults_test.go +++ b/pkg/apis/serving/v1beta1/service_defaults_test.go @@ -45,8 +45,9 @@ func TestServiceDefaulting(t *testing.T) { Spec: RevisionSpec{ PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), @@ -85,9 +86,10 @@ func TestServiceDefaulting(t *testing.T) { Spec: RevisionSpec{ PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), @@ -127,9 +129,10 @@ func TestServiceDefaulting(t *testing.T) { Spec: RevisionSpec{ PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, TimeoutSeconds: ptr.Int64(60), @@ -181,9 +184,10 @@ func TestServiceDefaulting(t *testing.T) { Spec: RevisionSpec{ PodSpec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: config.DefaultUserContainerName, - Image: "busybox", - Resources: defaultResources, + Name: config.DefaultUserContainerName, + Image: "busybox", + Resources: defaultResources, + ReadinessProbe: defaultProbe, }}, }, TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), diff --git a/pkg/reconciler/revision/resources/deploy.go b/pkg/reconciler/revision/resources/deploy.go index 48742790a5a4..22c91212bee5 100644 --- a/pkg/reconciler/revision/resources/deploy.go +++ b/pkg/reconciler/revision/resources/deploy.go @@ -134,8 +134,15 @@ func makePodSpec(rev *v1alpha1.Revision, loggingConfig *logging.Config, observab userContainer.TerminationMessagePolicy = corev1.TerminationMessageFallbackToLogsOnError } + if userContainer.ReadinessProbe != nil { + if userContainer.ReadinessProbe.HTTPGet != nil || userContainer.ReadinessProbe.TCPSocket != nil { + // HTTP and TCP ReadinessProbes are executed by the queue-proxy directly against the + // user-container instead of via kubelet. + userContainer.ReadinessProbe = nil + } + } + // If the client provides probes, we should fill in the port for them. - rewriteUserProbe(userContainer.ReadinessProbe, userPortInt) rewriteUserProbe(userContainer.LivenessProbe, userPortInt) podSpec := &corev1.PodSpec{ diff --git a/pkg/reconciler/revision/resources/deploy_test.go b/pkg/reconciler/revision/resources/deploy_test.go index 184068988225..fc31d38621c3 100644 --- a/pkg/reconciler/revision/resources/deploy_test.go +++ b/pkg/reconciler/revision/resources/deploy_test.go @@ -17,6 +17,7 @@ limitations under the License. package resources import ( + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -69,10 +70,18 @@ var ( } defaultQueueContainer = &corev1.Container{ - Name: QueueContainerName, - Resources: createQueueResources(make(map[string]string), &corev1.Container{}), - Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + Name: QueueContainerName, + Resources: createQueueResources(make(map[string]string), &corev1.Container{}), + Ports: append(queueNonServingPorts, queueHTTPPort), + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe", "0"}, + }, + }, + PeriodSeconds: 1, + TimeoutSeconds: 10, + }, SecurityContext: queueSecurityContext, Env: []corev1.EnvVar{{ Name: "SERVING_NAMESPACE", @@ -251,6 +260,16 @@ func withEnvVar(name, value string) containerOption { } } +func withEmptyProbeArgs(container *corev1.Container) { + container.Args = append(container.Args, []string{"--readiness-probe", "null"}...) +} + +func withArgs(args []string) containerOption { + return func(container *corev1.Container) { + container.Args = append(container.Args, args...) + } +} + func withInternalVolumeMount() containerOption { return func(container *corev1.Container) { container.VolumeMounts = append(container.VolumeMounts, internalVolumeMount) @@ -394,6 +413,7 @@ func TestMakePodSpec(t *testing.T) { queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), withEnvVar("USER_PORT", "8888"), + withEmptyProbeArgs, ), }), }, { @@ -437,6 +457,7 @@ func TestMakePodSpec(t *testing.T) { queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), withEnvVar("USER_PORT", "8888"), + withEmptyProbeArgs, ), }, withAppendedVolumes(corev1.Volume{ Name: "asdf", @@ -458,6 +479,7 @@ func TestMakePodSpec(t *testing.T) { userContainer(), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), + withEmptyProbeArgs, ), }), }, { @@ -481,6 +503,7 @@ func TestMakePodSpec(t *testing.T) { }), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), + withEmptyProbeArgs, ), }), }, { @@ -499,6 +522,7 @@ func TestMakePodSpec(t *testing.T) { queueContainer( withEnvVar("SERVING_CONFIGURATION", "parent-config"), withEnvVar("CONTAINER_CONCURRENCY", "1"), + withEmptyProbeArgs, ), }), }, { @@ -514,11 +538,34 @@ func TestMakePodSpec(t *testing.T) { cc: &deployment.Config{}, want: podSpec( []corev1.Container{ - userContainer( - withHTTPQPReadinessProbe, + userContainer(), + queueContainer( + withEnvVar("CONTAINER_CONCURRENCY", "0"), + withArgs([]string{"--readiness-probe", fmt.Sprintf(`{"httpGet":{"path":"/","port":%d,"host":"127.0.0.1","scheme":"HTTP","httpHeaders":[{"name":"K-Kubelet-Probe","value":"queue"}]}}`, v1alpha1.DefaultUserPort)}), ), + }), + }, { + name: "with tcp readiness probe", + rev: revision(func(revision *v1alpha1.Revision) { + container(revision.Spec.GetContainer(), + withReadinessProbe(corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromInt(12345), + }, + }), + ) + }), + lc: &logging.Config{}, + oc: &metrics.ObservabilityConfig{}, + ac: &autoscaler.Config{}, + cc: &deployment.Config{}, + want: podSpec( + []corev1.Container{ + userContainer(), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "0"), + withArgs([]string{"--readiness-probe", fmt.Sprintf(`{"tcpSocket":{"port":%d,"host":"127.0.0.1"}}`, v1alpha1.DefaultUserPort)}), ), }), }, { @@ -543,6 +590,7 @@ func TestMakePodSpec(t *testing.T) { ), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "0"), + withArgs([]string{"--readiness-probe", `{"tcpSocket":{"port":8080,"host":"127.0.0.1"},"successThreshold":1}`}), ), }), }, { @@ -576,6 +624,7 @@ func TestMakePodSpec(t *testing.T) { ), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "0"), + withEmptyProbeArgs, ), }), }, { @@ -602,6 +651,7 @@ func TestMakePodSpec(t *testing.T) { ), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "0"), + withEmptyProbeArgs, ), }), }, { @@ -620,6 +670,7 @@ func TestMakePodSpec(t *testing.T) { withEnvVar("CONTAINER_CONCURRENCY", "1"), withEnvVar("ENABLE_VAR_LOG_COLLECTION", "true"), withInternalVolumeMount(), + withEmptyProbeArgs, ), }, func(podSpec *corev1.PodSpec) { @@ -686,6 +737,7 @@ func TestMakePodSpec(t *testing.T) { queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), withEnvVar("SERVING_SERVICE", ""), + withEmptyProbeArgs, ), }), }} @@ -695,7 +747,6 @@ func TestMakePodSpec(t *testing.T) { quantityComparer := cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 }) - got := makePodSpec(test.rev, test.lc, test.oc, test.ac, test.cc) if diff := cmp.Diff(test.want, got, quantityComparer); diff != "" { t.Errorf("makePodSpec (-want, +got) = %v", diff) @@ -712,7 +763,6 @@ func TestMakePodSpec(t *testing.T) { *test.rev.Spec.DeprecatedContainer, } test.rev.Spec.DeprecatedContainer = nil - got := makePodSpec(test.rev, test.lc, test.oc, test.ac, test.cc) if diff := cmp.Diff(test.want, got, quantityComparer); diff != "" { t.Errorf("makePodSpec (-want, +got) = %v", diff) diff --git a/pkg/reconciler/revision/resources/queue.go b/pkg/reconciler/revision/resources/queue.go index 90a25c0ef65a..e3aad60d5038 100644 --- a/pkg/reconciler/revision/resources/queue.go +++ b/pkg/reconciler/revision/resources/queue.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "knative.dev/pkg/logging" pkgmetrics "knative.dev/pkg/metrics" "knative.dev/pkg/ptr" @@ -33,9 +34,15 @@ import ( "knative.dev/serving/pkg/autoscaler" "knative.dev/serving/pkg/deployment" "knative.dev/serving/pkg/metrics" + "knative.dev/serving/pkg/network" + "knative.dev/serving/pkg/queue" + "knative.dev/serving/pkg/queue/readiness" ) -const requestQueueHTTPPortName = "queue-port" +const ( + localAddress = "127.0.0.1" + requestQueueHTTPPortName = "queue-port" +) var ( queueHTTPPort = corev1.ContainerPort{ @@ -58,23 +65,6 @@ var ( ContainerPort: int32(networking.UserQueueMetricsPort), }} - queueReadinessProbe = &corev1.Probe{ - Handler: corev1.Handler{ - Exec: &corev1.ExecAction{ - Command: []string{"/ko-app/queue", "-probe", "true"}, - }, - }, - // We want to mark the service as not ready as soon as the - // PreStop handler is called, so we need to check a little - // bit more often than the default. It is a small - // sacrifice for a low rate of 503s. - PeriodSeconds: 1, - // We keep the connection open for a while because we're - // actively probing the user-container on that endpoint and - // thus don't want to be limited by K8s granularity here. - TimeoutSeconds: 10, - } - queueSecurityContext = &corev1.SecurityContext{ AllowPrivilegeEscalation: ptr.Bool(false), } @@ -156,6 +146,51 @@ func createResourcePercentageFromAnnotations(m map[string]string, k string) (boo return true, float32(value / 100) } +func makeQueueProbe(in *corev1.Probe) *corev1.Probe { + if in == nil || in.PeriodSeconds == 0 { + out := &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe", "0"}, + }, + }, + // We want to mark the service as not ready as soon as the + // PreStop handler is called, so we need to check a little + // bit more often than the default. It is a small + // sacrifice for a low rate of 503s. + PeriodSeconds: 1, + // We keep the connection open for a while because we're + // actively probing the user-container on that endpoint and + // thus don't want to be limited by K8s granularity here. + TimeoutSeconds: 10, + } + + if in != nil { + out.InitialDelaySeconds = in.InitialDelaySeconds + } + return out + } + + timeout := 1 + + if in.TimeoutSeconds > 1 { + timeout = int(in.TimeoutSeconds) + } + + return &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe", strconv.Itoa(timeout)}, + }, + }, + PeriodSeconds: in.PeriodSeconds, + TimeoutSeconds: int32(timeout), + SuccessThreshold: in.SuccessThreshold, + FailureThreshold: in.FailureThreshold, + InitialDelaySeconds: in.InitialDelaySeconds, + } +} + // makeQueueContainer creates the container spec for the queue sidecar. func makeQueueContainer(rev *v1alpha1.Revision, loggingConfig *logging.Config, observabilityConfig *metrics.ObservabilityConfig, autoscalerConfig *autoscaler.Config, deploymentConfig *deployment.Config) *corev1.Container { @@ -191,13 +226,21 @@ func makeQueueContainer(rev *v1alpha1.Revision, loggingConfig *logging.Config, o volumeMounts = append(volumeMounts, internalVolumeMount) } + rp := rev.Spec.GetContainer().ReadinessProbe.DeepCopy() + + applyReadinessProbeDefaults(rp, userPort) + + // TODO(joshrider) bubble up error instead of squashing it here + probeJSON, _ := readiness.EncodeProbe(rp) + return &corev1.Container{ Name: QueueContainerName, Image: deploymentConfig.QueueSidecarImage, Resources: createQueueResources(rev.GetAnnotations(), rev.Spec.GetContainer()), Ports: ports, - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: makeQueueProbe(rp), VolumeMounts: volumeMounts, + Args: []string{"--readiness-probe", probeJSON}, SecurityContext: queueSecurityContext, Env: []corev1.EnvVar{{ Name: "SERVING_NAMESPACE", @@ -270,3 +313,39 @@ func makeQueueContainer(rev *v1alpha1.Revision, loggingConfig *logging.Config, o }}, } } + +func applyReadinessProbeDefaults(p *corev1.Probe, port int32) { + if p == nil { + return + } + + switch { + case p.HTTPGet != nil: + p.HTTPGet.Host = localAddress + p.HTTPGet.Port = intstr.FromInt(int(port)) + + if p.HTTPGet.Scheme == "" { + p.HTTPGet.Scheme = corev1.URISchemeHTTP + } + + p.HTTPGet.HTTPHeaders = append(p.HTTPGet.HTTPHeaders, corev1.HTTPHeader{ + Name: network.KubeletProbeHeaderName, + Value: queue.Name, + }) + case p.TCPSocket != nil: + p.TCPSocket.Host = localAddress + p.TCPSocket.Port = intstr.FromInt(int(port)) + case p.Exec != nil: + // Use default TCP connect probe to ensure data path is open from queue-proxy to + // user-container. User-defined ExecProbe will still be run on user-container. + p.TCPSocket = &corev1.TCPSocketAction{} + p.SuccessThreshold = 1 + p.TCPSocket.Host = localAddress + p.TCPSocket.Port = intstr.FromInt(int(port)) + p.Exec = nil + } + + if p.PeriodSeconds > 0 && p.TimeoutSeconds < 1 { + p.TimeoutSeconds = 1 + } +} diff --git a/pkg/reconciler/revision/resources/queue_test.go b/pkg/reconciler/revision/resources/queue_test.go index d8b581e40377..00e5ac75a22a 100644 --- a/pkg/reconciler/revision/resources/queue_test.go +++ b/pkg/reconciler/revision/resources/queue_test.go @@ -17,18 +17,19 @@ limitations under the License. package resources import ( + "encoding/json" "sort" "strconv" "testing" - "knative.dev/serving/pkg/resources" - "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "go.uber.org/zap/zapcore" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "knative.dev/pkg/logging" pkgmetrics "knative.dev/pkg/metrics" _ "knative.dev/pkg/metrics/testing" @@ -42,8 +43,27 @@ import ( "knative.dev/serving/pkg/autoscaler" "knative.dev/serving/pkg/deployment" "knative.dev/serving/pkg/metrics" + "knative.dev/serving/pkg/network" + "knative.dev/serving/pkg/resources" ) +var defaultKnativeQReadinessProbe = &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe", "0"}, + }, + }, + // We want to mark the service as not ready as soon as the + // PreStop handler is called, so we need to check a little + // bit more often than the default. It is a small + // sacrifice for a low rate of 503s. + PeriodSeconds: 1, + // We keep the connection open for a while because we're + // actively probing the user-container on that endpoint and + // thus don't want to be limited by K8s granularity here. + TimeoutSeconds: 10, +} + func TestMakeQueueContainer(t *testing.T) { tests := []struct { name string @@ -77,7 +97,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Env: env(nil), @@ -117,7 +137,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTP2Port), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Image: "alpine", @@ -155,7 +175,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Image: "alpine", @@ -193,7 +213,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Env: env(map[string]string{ @@ -232,7 +252,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Env: env(map[string]string{ @@ -267,7 +287,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Env: env(map[string]string{ @@ -298,7 +318,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Env: env(map[string]string{ @@ -332,7 +352,7 @@ func TestMakeQueueContainer(t *testing.T) { Name: QueueContainerName, Resources: createQueueResources(make(map[string]string), &corev1.Container{}), Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Env: env(map[string]string{ @@ -354,6 +374,7 @@ func TestMakeQueueContainer(t *testing.T) { got := makeQueueContainer(test.rev, test.lc, test.oc, test.ac, test.cc) sortEnv(got.Env) + test.want.Args = probeArgs(test.want.Args, test.rev.Spec.GetContainer().ReadinessProbe) if diff := cmp.Diff(test.want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { t.Errorf("makeQueueContainer (-want, +got) = %v", diff) } @@ -421,7 +442,7 @@ func TestMakeQueueContainerWithPercentageAnnotation(t *testing.T) { }, }, Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Image: "alpine", @@ -476,7 +497,7 @@ func TestMakeQueueContainerWithPercentageAnnotation(t *testing.T) { }, }, Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Image: "alpine", @@ -530,7 +551,7 @@ func TestMakeQueueContainerWithPercentageAnnotation(t *testing.T) { }, }, Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Image: "alpine", @@ -584,7 +605,7 @@ func TestMakeQueueContainerWithPercentageAnnotation(t *testing.T) { }, }, Ports: append(queueNonServingPorts, queueHTTPPort), - ReadinessProbe: queueReadinessProbe, + ReadinessProbe: defaultKnativeQReadinessProbe, SecurityContext: queueSecurityContext, // These changed based on the Revision and configs passed in. Image: "alpine", @@ -598,6 +619,7 @@ func TestMakeQueueContainerWithPercentageAnnotation(t *testing.T) { t.Run(test.name, func(t *testing.T) { got := makeQueueContainer(test.rev, test.lc, test.oc, test.ac, test.cc) sortEnv(got.Env) + test.want.Args = probeArgs(test.want.Args, test.rev.Spec.GetContainer().ReadinessProbe) if diff := cmp.Diff(test.want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { t.Errorf("makeQueueContainerWithPercentageAnnotation (-want, +got) = %v", diff) } @@ -617,6 +639,371 @@ func TestMakeQueueContainerWithPercentageAnnotation(t *testing.T) { } } +func TestProbeGenerationHTTPDefaults(t *testing.T) { + rev := &v1alpha1.Revision{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo", + Name: "bar", + UID: "1234", + }, + Spec: v1alpha1.RevisionSpec{ + RevisionSpec: v1beta1.RevisionSpec{ + ContainerConcurrency: 1, + TimeoutSeconds: ptr.Int64(45), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: containerName, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + }, + }, + PeriodSeconds: 1, + TimeoutSeconds: 10, + }, + }}, + }, + }, + }, + } + + expectedProbe := &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Host: "127.0.0.1", + Path: "/", + Port: intstr.FromInt(int(v1alpha1.DefaultUserPort)), + Scheme: corev1.URISchemeHTTP, + HTTPHeaders: []corev1.HTTPHeader{{ + Name: network.KubeletProbeHeaderName, + Value: "queue", + }}, + }, + }, + PeriodSeconds: 1, + TimeoutSeconds: 10, + } + probeBytes, err := json.Marshal(expectedProbe) + if err != nil { + t.Fatalf("Failed to marshall readiness probe %#v", err) + } + + lc := &logging.Config{} + oc := &metrics.ObservabilityConfig{} + ac := &autoscaler.Config{} + cc := &deployment.Config{} + want := &corev1.Container{ + // These are effectively constant + Name: QueueContainerName, + Resources: createQueueResources(make(map[string]string), &corev1.Container{}), + Ports: append(queueNonServingPorts, queueHTTPPort), + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe", "10"}, + }, + }, + PeriodSeconds: 1, + TimeoutSeconds: 10, + }, + // These changed based on the Revision and configs passed in. + Env: env(map[string]string{}), + SecurityContext: queueSecurityContext, + Args: []string{"--readiness-probe", string(probeBytes)}, + } + + got := makeQueueContainer(rev, lc, oc, ac, cc) + sortEnv(got.Env) + if diff := cmp.Diff(want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { + t.Errorf("makeQueueContainer(-want, +got) = %v", diff) + } +} + +func TestProbeGenerationHTTP(t *testing.T) { + userPort := 12345 + probePath := "/health" + + rev := &v1alpha1.Revision{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo", + Name: "bar", + UID: "1234", + }, + Spec: v1alpha1.RevisionSpec{ + RevisionSpec: v1beta1.RevisionSpec{ + ContainerConcurrency: 1, + TimeoutSeconds: ptr.Int64(45), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: containerName, + Ports: []v1.ContainerPort{{ + ContainerPort: int32(userPort), + }}, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: probePath, + Scheme: corev1.URISchemeHTTPS, + }, + }, + PeriodSeconds: 2, + TimeoutSeconds: 10, + }, + }}, + }, + }, + }, + } + + expectedProbe := &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Host: "127.0.0.1", + Path: probePath, + Port: intstr.FromInt(userPort), + Scheme: corev1.URISchemeHTTPS, + HTTPHeaders: []corev1.HTTPHeader{{ + Name: network.KubeletProbeHeaderName, + Value: "queue", + }}, + }, + }, + PeriodSeconds: 2, + TimeoutSeconds: 10, + } + probeBytes, err := json.Marshal(expectedProbe) + if err != nil { + t.Fatalf("Failed to marshall readiness probe %#v", err) + } + + lc := &logging.Config{} + oc := &metrics.ObservabilityConfig{} + ac := &autoscaler.Config{} + cc := &deployment.Config{} + want := &corev1.Container{ + // These are effectively constant + Name: QueueContainerName, + Resources: createQueueResources(make(map[string]string), &corev1.Container{}), + Ports: append(queueNonServingPorts, queueHTTPPort), + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe", "10"}, + }, + }, + PeriodSeconds: 2, + TimeoutSeconds: 10, + }, + // These changed based on the Revision and configs passed in. + Env: env(map[string]string{"USER_PORT": strconv.Itoa(userPort)}), + SecurityContext: queueSecurityContext, + Args: []string{"--readiness-probe", string(probeBytes)}, + } + + got := makeQueueContainer(rev, lc, oc, ac, cc) + sortEnv(got.Env) + if diff := cmp.Diff(want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { + t.Errorf("makeQueueContainer(-want, +got) = %v", diff) + } +} + +func TestTCPProbeGeneration(t *testing.T) { + userPort := 12345 + tests := []struct { + name string + rev v1alpha1.RevisionSpec + want *corev1.Container + wantProbe *corev1.Probe + }{{ + name: "knative tcp probe", + wantProbe: &corev1.Probe{ + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromInt(userPort), + }, + }, + PeriodSeconds: 0, + SuccessThreshold: 3, + }, + rev: v1alpha1.RevisionSpec{ + RevisionSpec: v1beta1.RevisionSpec{ + ContainerConcurrency: 1, + TimeoutSeconds: ptr.Int64(45), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: containerName, + Ports: []v1.ContainerPort{{ + ContainerPort: int32(userPort), + }}, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{}, + }, + PeriodSeconds: 0, + SuccessThreshold: 3, + }, + }}, + }, + }, + }, + want: &corev1.Container{ + // These are effectively constant + Name: QueueContainerName, + Resources: createQueueResources(make(map[string]string), &corev1.Container{}), + Ports: append(queueNonServingPorts, queueHTTPPort), + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe", "0"}, + }, + }, + PeriodSeconds: 1, + TimeoutSeconds: 10, + }, + // These changed based on the Revision and configs passed in. + Env: env(map[string]string{"USER_PORT": strconv.Itoa(userPort)}), + SecurityContext: queueSecurityContext, + }, + }, { + name: "tcp defaults", + rev: v1alpha1.RevisionSpec{ + RevisionSpec: v1beta1.RevisionSpec{ + ContainerConcurrency: 1, + TimeoutSeconds: ptr.Int64(45), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: containerName, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{}, + }, + PeriodSeconds: 1, + }, + }}, + }, + }, + }, + wantProbe: &corev1.Probe{ + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromInt(int(v1alpha1.DefaultUserPort)), + }, + }, + PeriodSeconds: 1, + TimeoutSeconds: 1, + }, + want: &corev1.Container{ + // These are effectively constant + Name: QueueContainerName, + Resources: createQueueResources(make(map[string]string), &corev1.Container{}), + Ports: append(queueNonServingPorts, queueHTTPPort), + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe", "1"}, + }, + }, + PeriodSeconds: 1, + TimeoutSeconds: 1, + }, + // These changed based on the Revision and configs passed in. + Env: env(map[string]string{}), + SecurityContext: queueSecurityContext, + }, + }, { + name: "user defined tcp probe", + wantProbe: &corev1.Probe{ + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromInt(userPort), + }, + }, + PeriodSeconds: 2, + TimeoutSeconds: 15, + SuccessThreshold: 2, + FailureThreshold: 7, + InitialDelaySeconds: 3, + }, + rev: v1alpha1.RevisionSpec{ + RevisionSpec: v1beta1.RevisionSpec{ + ContainerConcurrency: 1, + TimeoutSeconds: ptr.Int64(45), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: containerName, + Ports: []v1.ContainerPort{{ + ContainerPort: int32(userPort), + }}, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + TCPSocket: &corev1.TCPSocketAction{}, + }, + PeriodSeconds: 2, + TimeoutSeconds: 15, + SuccessThreshold: 2, + FailureThreshold: 7, + InitialDelaySeconds: 3, + }, + }}, + }, + }, + }, + want: &corev1.Container{ + // These are effectively constant + Name: QueueContainerName, + Resources: createQueueResources(make(map[string]string), &corev1.Container{}), + Ports: append(queueNonServingPorts, queueHTTPPort), + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe", "15"}, + }, + }, + PeriodSeconds: 2, + TimeoutSeconds: 15, + SuccessThreshold: 2, + FailureThreshold: 7, + InitialDelaySeconds: 3, + }, + // These changed based on the Revision and configs passed in. + Env: env(map[string]string{"USER_PORT": strconv.Itoa(userPort)}), + SecurityContext: queueSecurityContext, + }, + }} + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + lc := &logging.Config{} + oc := &metrics.ObservabilityConfig{} + ac := &autoscaler.Config{} + cc := &deployment.Config{} + probeBytes, err := json.Marshal(test.wantProbe) + if err != nil { + t.Fatalf("Failed to marshall readiness probe %#v", err) + } + test.want.Args = []string{"--readiness-probe", string(probeBytes)} + + testRev := &v1alpha1.Revision{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo", + Name: "bar", + UID: "1234", + }, + Spec: test.rev, + } + + got := makeQueueContainer(testRev, lc, oc, ac, cc) + sortEnv(got.Env) + if diff := cmp.Diff(test.want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { + t.Errorf("makeQueueContainer (-want, +got) = %v", diff) + } + }) + } +} + var defaultEnv = map[string]string{ "SERVING_NAMESPACE": "foo", "SERVING_SERVICE": "", @@ -638,6 +1025,18 @@ var defaultEnv = map[string]string{ "INTERNAL_VOLUME_PATH": internalVolumePath, } +func probeArgs(containerArgs []string, probe *corev1.Probe) []string { + probeArgs := []string{"--readiness-probe", "null"} + if probe != nil { + probeBytes, err := json.Marshal(probe) + if err != nil { + return append(containerArgs, probeArgs...) + } + return append(containerArgs, []string{"--readiness-probe", string(probeBytes)}...) + } + return append(containerArgs, probeArgs...) +} + func env(overrides map[string]string) []corev1.EnvVar { values := resources.UnionMaps(defaultEnv, overrides) From b51d131f3d182eb6060a2231da4805ad2cc36868 Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Fri, 12 Jul 2019 11:13:07 -0400 Subject: [PATCH 2/9] validate probe handler count Co-authored-by: Shash Reddy --- pkg/apis/serving/k8s_validation.go | 37 +++++++++++++++++++------ pkg/apis/serving/k8s_validation_test.go | 24 ++++++++++++++-- 2 files changed, 50 insertions(+), 11 deletions(-) diff --git a/pkg/apis/serving/k8s_validation.go b/pkg/apis/serving/k8s_validation.go index f9e0682b56b1..dde0f43c4917 100644 --- a/pkg/apis/serving/k8s_validation.go +++ b/pkg/apis/serving/k8s_validation.go @@ -467,15 +467,34 @@ func validateProbe(p *corev1.Probe) *apis.FieldError { h := p.Handler errs = errs.Also(apis.CheckDisallowedFields(h, *HandlerMask(&h))) - switch { - case h.HTTPGet != nil: - errs = errs.Also(apis.CheckDisallowedFields(*h.HTTPGet, *HTTPGetActionMask(h.HTTPGet))).ViaField("httpGet") - case h.TCPSocket != nil: - errs = errs.Also(apis.CheckDisallowedFields(*h.TCPSocket, *TCPSocketActionMask(h.TCPSocket))).ViaField("tcpSocket") - case h.Exec != nil: - errs = errs.Also(apis.CheckDisallowedFields(*h.Exec, *ExecActionMask(h.Exec))).ViaField("exec") - default: - errs = errs.Also(apis.ErrMissingField()) + numHandlers := 0 + + if h.HTTPGet != nil { + if numHandlers > 0 { + errs = errs.Also(apis.ErrDisallowedFields("httpGet")) + } else { + numHandlers++ + errs = errs.Also(apis.CheckDisallowedFields(*h.HTTPGet, *HTTPGetActionMask(h.HTTPGet))).ViaField("httpGet") + } + } + if h.TCPSocket != nil { + if numHandlers > 0 { + errs = errs.Also(apis.ErrDisallowedFields("tcpSocket")) + } else { + numHandlers++ + errs = errs.Also(apis.CheckDisallowedFields(*h.TCPSocket, *TCPSocketActionMask(h.TCPSocket))).ViaField("tcpSocket") + } + } + if h.Exec != nil { + if numHandlers > 0 { + errs = errs.Also(apis.ErrDisallowedFields("exec")) + } else { + numHandlers++ + errs = errs.Also(apis.CheckDisallowedFields(*h.Exec, *ExecActionMask(h.Exec))).ViaField("exec") + } + } + if numHandlers == 0 { + errs = errs.Also(apis.ErrMissingField("handler")) } return errs } diff --git a/pkg/apis/serving/k8s_validation_test.go b/pkg/apis/serving/k8s_validation_test.go index a4bd9b2e1598..50e27f02b04d 100644 --- a/pkg/apis/serving/k8s_validation_test.go +++ b/pkg/apis/serving/k8s_validation_test.go @@ -539,7 +539,7 @@ func TestContainerValidation(t *testing.T) { }, want: nil, }, { - name: "invalid with no probes ", + name: "invalid with no handler", c: corev1.Container{ Image: "foo", ReadinessProbe: &corev1.Probe{ @@ -557,7 +557,27 @@ func TestContainerValidation(t *testing.T) { Handler: corev1.Handler{}, }, }, - want: apis.ErrMissingField(""), + want: apis.ErrMissingField("livenessProbe.handler"), + }, { + name: "invalid with multiple handlers", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + }, + Exec: &corev1.ExecAction{}, + TCPSocket: &corev1.TCPSocketAction{}, + }, + }, + }, + want: apis.ErrDisallowedFields("readinessProbe.exec").Also( + apis.ErrDisallowedFields("readinessProbe.tcpSocket")), }, { name: "invalid readiness http probe (has port)", c: corev1.Container{ From d7ab18d688efcd7f617c126fd7150edf778cd8f4 Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Fri, 12 Jul 2019 13:38:52 -0400 Subject: [PATCH 3/9] move guard to switch Co-authored-by: Shash Reddy --- pkg/reconciler/revision/resources/queue.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/reconciler/revision/resources/queue.go b/pkg/reconciler/revision/resources/queue.go index e3aad60d5038..e30aed16217d 100644 --- a/pkg/reconciler/revision/resources/queue.go +++ b/pkg/reconciler/revision/resources/queue.go @@ -315,11 +315,9 @@ func makeQueueContainer(rev *v1alpha1.Revision, loggingConfig *logging.Config, o } func applyReadinessProbeDefaults(p *corev1.Probe, port int32) { - if p == nil { - return - } - switch { + case p == nil: + return case p.HTTPGet != nil: p.HTTPGet.Host = localAddress p.HTTPGet.Port = intstr.FromInt(int(port)) From 08b12d9ac195915bd715b3e932680adf537eb371 Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Mon, 15 Jul 2019 09:49:48 -0400 Subject: [PATCH 4/9] remove unreachable branch --- pkg/apis/serving/k8s_validation.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/apis/serving/k8s_validation.go b/pkg/apis/serving/k8s_validation.go index dde0f43c4917..ec840e49059e 100644 --- a/pkg/apis/serving/k8s_validation.go +++ b/pkg/apis/serving/k8s_validation.go @@ -470,12 +470,8 @@ func validateProbe(p *corev1.Probe) *apis.FieldError { numHandlers := 0 if h.HTTPGet != nil { - if numHandlers > 0 { - errs = errs.Also(apis.ErrDisallowedFields("httpGet")) - } else { - numHandlers++ - errs = errs.Also(apis.CheckDisallowedFields(*h.HTTPGet, *HTTPGetActionMask(h.HTTPGet))).ViaField("httpGet") - } + numHandlers++ + errs = errs.Also(apis.CheckDisallowedFields(*h.HTTPGet, *HTTPGetActionMask(h.HTTPGet))).ViaField("httpGet") } if h.TCPSocket != nil { if numHandlers > 0 { From 3f9dbe2f1a9286486e2e4c81144e5dc4a29e2ca0 Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Mon, 15 Jul 2019 16:51:49 -0700 Subject: [PATCH 5/9] Address comments - merge logic for knative probes and user defined probes - use probe-period as argument name - pass probe as environment variable instead of container args Signed-off-by: Shash Reddy --- cmd/queue/main.go | 45 ++++++----------- pkg/queue/readiness/probe_encoding.go | 6 +++ pkg/queue/readiness/probe_encoding_test.go | 12 +++++ .../revision/resources/deploy_test.go | 38 ++++++-------- pkg/reconciler/revision/resources/queue.go | 4 +- .../revision/resources/queue_test.go | 50 +++++++++---------- 6 files changed, 72 insertions(+), 83 deletions(-) diff --git a/cmd/queue/main.go b/cmd/queue/main.go index 51f8dcef59d3..4c7f722c779c 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -115,7 +115,6 @@ var ( "The response time in millisecond", stats.UnitMilliseconds) readinessProbeTimeout = flag.Int("probe", -1, "run readiness probe with given timeout") - ucProbe = flag.String("readiness-probe", "", "JSON readiness probe configuration for user container") ) type config struct { @@ -137,6 +136,7 @@ type config struct { ServingLoggingLevel string `split_words:"true" required:"true"` ServingRequestMetricsBackend string `split_words:"true" required:"true"` ServingRequestLogTemplate string `split_words:"true" required:"true"` + ServingReadinessProbe string `split_words:"true" required:"true"` } func initConfig(env config) { @@ -234,54 +234,37 @@ func createAdminHandlers(p *readiness.Probe) *http.ServeMux { return mux } - func probeQueueHealthPath(port int, timeoutSeconds int) error { url := fmt.Sprintf(healthURLTemplate, port) - - // Use aggressive sub-second retries. - if timeoutSeconds == 0 { - return knativeProbe(url) + timeoutDuration := readiness.PollTimeout + if timeoutSeconds != 0 { + timeoutDuration = time.Duration(timeoutSeconds) * time.Second } - httpClient := &http.Client{ Transport: &http.Transport{ // Do not use the cached connection DisableKeepAlives: true, }, - Timeout: time.Duration(timeoutSeconds) * time.Second, - } - - res, err := httpClient.Get(url) - - if err != nil { - return errors.Wrap(err, "failed to probe") - } - if !health.IsHTTPProbeReady(res) { - return errors.New("probe returned not ready") + Timeout: timeoutDuration, } - return nil -} + stopCh := make(chan struct{}) + timeCh := time.After(timeoutDuration) -func knativeProbe(url string) error { - httpClient := &http.Client{ - Transport: &http.Transport{ - // Do not use the cached connection - DisableKeepAlives: true, - }, - Timeout: readiness.PollTimeout, - } + go func() { + <-timeCh + close(stopCh) + }() var lastErr error - - timeoutErr := wait.PollImmediate(aggressivePollInterval, readiness.PollTimeout, func() (bool, error) { + timeoutErr := wait.PollImmediateUntil(aggressivePollInterval, func() (bool, error) { var res *http.Response if res, lastErr = httpClient.Get(url); res == nil { return false, nil } defer res.Body.Close() return health.IsHTTPProbeReady(res), nil - }) + }, stopCh) if lastErr != nil { return errors.Wrap(lastErr, "failed to probe") @@ -361,7 +344,7 @@ func main() { StatChan: statChan, }, time.Now()) - coreProbe, err := readiness.DecodeProbe(*ucProbe) + coreProbe, err := readiness.DecodeProbe(env.ServingReadinessProbe) if err != nil { logger.Fatalw("Queue container failed to parse readiness probe", zap.Error(err)) } diff --git a/pkg/queue/readiness/probe_encoding.go b/pkg/queue/readiness/probe_encoding.go index ba59b949266f..0d12416ae616 100644 --- a/pkg/queue/readiness/probe_encoding.go +++ b/pkg/queue/readiness/probe_encoding.go @@ -19,6 +19,8 @@ package readiness import ( "encoding/json" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" ) @@ -33,6 +35,10 @@ func DecodeProbe(jsonProbe string) (*corev1.Probe, error) { // EncodeProbe takes *corev1.Probe object and returns marshalled Probe JSON string and an error. func EncodeProbe(rp *corev1.Probe) (string, error) { + if rp == nil { + return "", errors.New("cannot encode nil probe") + } + probeJSON, err := json.Marshal(rp) if err != nil { return "", err diff --git a/pkg/queue/readiness/probe_encoding_test.go b/pkg/queue/readiness/probe_encoding_test.go index 5868d1db3768..7ae97b4ada40 100644 --- a/pkg/queue/readiness/probe_encoding_test.go +++ b/pkg/queue/readiness/probe_encoding_test.go @@ -84,3 +84,15 @@ func TestEncodeProbe(t *testing.T) { t.Errorf("Probe diff: %s; got %v, want %v", diff, jsonProbe, wantProbe) } } + +func TestEncodeNilProbe(t *testing.T) { + jsonProbe, err := EncodeProbe(nil) + + if err == nil { + t.Errorf("Expected error") + } + + if jsonProbe != "" { + t.Errorf("Expected empty probe string; got %s", jsonProbe) + } +} diff --git a/pkg/reconciler/revision/resources/deploy_test.go b/pkg/reconciler/revision/resources/deploy_test.go index fc31d38621c3..d0d58761f398 100644 --- a/pkg/reconciler/revision/resources/deploy_test.go +++ b/pkg/reconciler/revision/resources/deploy_test.go @@ -17,7 +17,6 @@ limitations under the License. package resources import ( - "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -260,10 +259,6 @@ func withEnvVar(name, value string) containerOption { } } -func withEmptyProbeArgs(container *corev1.Container) { - container.Args = append(container.Args, []string{"--readiness-probe", "null"}...) -} - func withArgs(args []string) containerOption { return func(container *corev1.Container) { container.Args = append(container.Args, args...) @@ -413,7 +408,7 @@ func TestMakePodSpec(t *testing.T) { queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), withEnvVar("USER_PORT", "8888"), - withEmptyProbeArgs, + withEnvVar("SERVING_READINESS_PROBE", ""), ), }), }, { @@ -457,7 +452,7 @@ func TestMakePodSpec(t *testing.T) { queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), withEnvVar("USER_PORT", "8888"), - withEmptyProbeArgs, + withEnvVar("SERVING_READINESS_PROBE", ""), ), }, withAppendedVolumes(corev1.Volume{ Name: "asdf", @@ -479,7 +474,7 @@ func TestMakePodSpec(t *testing.T) { userContainer(), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), - withEmptyProbeArgs, + withEnvVar("SERVING_READINESS_PROBE", ""), ), }), }, { @@ -503,7 +498,7 @@ func TestMakePodSpec(t *testing.T) { }), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), - withEmptyProbeArgs, + withEnvVar("SERVING_READINESS_PROBE", ""), ), }), }, { @@ -522,7 +517,7 @@ func TestMakePodSpec(t *testing.T) { queueContainer( withEnvVar("SERVING_CONFIGURATION", "parent-config"), withEnvVar("CONTAINER_CONCURRENCY", "1"), - withEmptyProbeArgs, + withEnvVar("SERVING_READINESS_PROBE", ""), ), }), }, { @@ -541,7 +536,7 @@ func TestMakePodSpec(t *testing.T) { userContainer(), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "0"), - withArgs([]string{"--readiness-probe", fmt.Sprintf(`{"httpGet":{"path":"/","port":%d,"host":"127.0.0.1","scheme":"HTTP","httpHeaders":[{"name":"K-Kubelet-Probe","value":"queue"}]}}`, v1alpha1.DefaultUserPort)}), + withEnvVar("SERVING_READINESS_PROBE", `{"httpGet":{"path":"/","port":8080,"host":"127.0.0.1","scheme":"HTTP","httpHeaders":[{"name":"K-Kubelet-Probe","value":"queue"}]}}`), ), }), }, { @@ -565,16 +560,14 @@ func TestMakePodSpec(t *testing.T) { userContainer(), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "0"), - withArgs([]string{"--readiness-probe", fmt.Sprintf(`{"tcpSocket":{"port":%d,"host":"127.0.0.1"}}`, v1alpha1.DefaultUserPort)}), + withEnvVar("SERVING_READINESS_PROBE", `{"tcpSocket":{"port":8080,"host":"127.0.0.1"}}`), ), }), }, { name: "with shell readiness probe", rev: revision(func(revision *v1alpha1.Revision) { container(revision.Spec.GetContainer(), - withExecReadinessProbe( - []string{"echo", "hello"}, - ), + withExecReadinessProbe([]string{"echo", "hello"}), ) }), lc: &logging.Config{}, @@ -584,13 +577,10 @@ func TestMakePodSpec(t *testing.T) { want: podSpec( []corev1.Container{ userContainer( - withExecReadinessProbe( - []string{"echo", "hello"}, - ), - ), + withExecReadinessProbe([]string{"echo", "hello"})), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "0"), - withArgs([]string{"--readiness-probe", `{"tcpSocket":{"port":8080,"host":"127.0.0.1"},"successThreshold":1}`}), + withEnvVar("SERVING_READINESS_PROBE", `{"tcpSocket":{"port":8080,"host":"127.0.0.1"},"successThreshold":1}`), ), }), }, { @@ -624,7 +614,7 @@ func TestMakePodSpec(t *testing.T) { ), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "0"), - withEmptyProbeArgs, + withEnvVar("SERVING_READINESS_PROBE", ""), ), }), }, { @@ -651,7 +641,7 @@ func TestMakePodSpec(t *testing.T) { ), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "0"), - withEmptyProbeArgs, + withEnvVar("SERVING_READINESS_PROBE", ""), ), }), }, { @@ -670,7 +660,7 @@ func TestMakePodSpec(t *testing.T) { withEnvVar("CONTAINER_CONCURRENCY", "1"), withEnvVar("ENABLE_VAR_LOG_COLLECTION", "true"), withInternalVolumeMount(), - withEmptyProbeArgs, + withEnvVar("SERVING_READINESS_PROBE", ""), ), }, func(podSpec *corev1.PodSpec) { @@ -736,8 +726,8 @@ func TestMakePodSpec(t *testing.T) { ), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "1"), + withEnvVar("SERVING_READINESS_PROBE", ""), withEnvVar("SERVING_SERVICE", ""), - withEmptyProbeArgs, ), }), }} diff --git a/pkg/reconciler/revision/resources/queue.go b/pkg/reconciler/revision/resources/queue.go index e30aed16217d..f0af4f553b27 100644 --- a/pkg/reconciler/revision/resources/queue.go +++ b/pkg/reconciler/revision/resources/queue.go @@ -240,7 +240,6 @@ func makeQueueContainer(rev *v1alpha1.Revision, loggingConfig *logging.Config, o Ports: ports, ReadinessProbe: makeQueueProbe(rp), VolumeMounts: volumeMounts, - Args: []string{"--readiness-probe", probeJSON}, SecurityContext: queueSecurityContext, Env: []corev1.EnvVar{{ Name: "SERVING_NAMESPACE", @@ -310,6 +309,9 @@ func makeQueueContainer(rev *v1alpha1.Revision, loggingConfig *logging.Config, o }, { Name: "INTERNAL_VOLUME_PATH", Value: internalVolumePath, + }, { + Name: "SERVING_READINESS_PROBE", + Value: probeJSON, }}, } } diff --git a/pkg/reconciler/revision/resources/queue_test.go b/pkg/reconciler/revision/resources/queue_test.go index 00e5ac75a22a..7f0b7d77af90 100644 --- a/pkg/reconciler/revision/resources/queue_test.go +++ b/pkg/reconciler/revision/resources/queue_test.go @@ -373,8 +373,12 @@ func TestMakeQueueContainer(t *testing.T) { } got := makeQueueContainer(test.rev, test.lc, test.oc, test.ac, test.cc) + test.want.Env = append(test.want.Env, corev1.EnvVar{ + Name: "SERVING_READINESS_PROBE", + Value: probeJSON(test.rev.Spec.GetContainer().ReadinessProbe), + }) sortEnv(got.Env) - test.want.Args = probeArgs(test.want.Args, test.rev.Spec.GetContainer().ReadinessProbe) + sortEnv(test.want.Env) if diff := cmp.Diff(test.want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { t.Errorf("makeQueueContainer (-want, +got) = %v", diff) } @@ -618,8 +622,12 @@ func TestMakeQueueContainerWithPercentageAnnotation(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { got := makeQueueContainer(test.rev, test.lc, test.oc, test.ac, test.cc) + test.want.Env = append(test.want.Env, corev1.EnvVar{ + Name: "SERVING_READINESS_PROBE", + Value: probeJSON(test.rev.Spec.GetContainer().ReadinessProbe), + }) sortEnv(got.Env) - test.want.Args = probeArgs(test.want.Args, test.rev.Spec.GetContainer().ReadinessProbe) + sortEnv(test.want.Env) if diff := cmp.Diff(test.want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { t.Errorf("makeQueueContainerWithPercentageAnnotation (-want, +got) = %v", diff) } @@ -684,10 +692,6 @@ func TestProbeGenerationHTTPDefaults(t *testing.T) { PeriodSeconds: 1, TimeoutSeconds: 10, } - probeBytes, err := json.Marshal(expectedProbe) - if err != nil { - t.Fatalf("Failed to marshall readiness probe %#v", err) - } lc := &logging.Config{} oc := &metrics.ObservabilityConfig{} @@ -708,9 +712,10 @@ func TestProbeGenerationHTTPDefaults(t *testing.T) { TimeoutSeconds: 10, }, // These changed based on the Revision and configs passed in. - Env: env(map[string]string{}), + Env: env(map[string]string{ + "SERVING_READINESS_PROBE": probeJSON(expectedProbe), + }), SecurityContext: queueSecurityContext, - Args: []string{"--readiness-probe", string(probeBytes)}, } got := makeQueueContainer(rev, lc, oc, ac, cc) @@ -772,10 +777,6 @@ func TestProbeGenerationHTTP(t *testing.T) { PeriodSeconds: 2, TimeoutSeconds: 10, } - probeBytes, err := json.Marshal(expectedProbe) - if err != nil { - t.Fatalf("Failed to marshall readiness probe %#v", err) - } lc := &logging.Config{} oc := &metrics.ObservabilityConfig{} @@ -796,9 +797,8 @@ func TestProbeGenerationHTTP(t *testing.T) { TimeoutSeconds: 10, }, // These changed based on the Revision and configs passed in. - Env: env(map[string]string{"USER_PORT": strconv.Itoa(userPort)}), + Env: env(map[string]string{"USER_PORT": strconv.Itoa(userPort), "SERVING_READINESS_PROBE": probeJSON(expectedProbe)}), SecurityContext: queueSecurityContext, - Args: []string{"--readiness-probe", string(probeBytes)}, } got := makeQueueContainer(rev, lc, oc, ac, cc) @@ -980,12 +980,6 @@ func TestTCPProbeGeneration(t *testing.T) { oc := &metrics.ObservabilityConfig{} ac := &autoscaler.Config{} cc := &deployment.Config{} - probeBytes, err := json.Marshal(test.wantProbe) - if err != nil { - t.Fatalf("Failed to marshall readiness probe %#v", err) - } - test.want.Args = []string{"--readiness-probe", string(probeBytes)} - testRev := &v1alpha1.Revision{ ObjectMeta: metav1.ObjectMeta{ Namespace: "foo", @@ -994,9 +988,14 @@ func TestTCPProbeGeneration(t *testing.T) { }, Spec: test.rev, } + test.want.Env = append(test.want.Env, corev1.EnvVar{ + Name: "SERVING_READINESS_PROBE", + Value: probeJSON(test.wantProbe), + }) got := makeQueueContainer(testRev, lc, oc, ac, cc) sortEnv(got.Env) + sortEnv(test.want.Env) if diff := cmp.Diff(test.want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { t.Errorf("makeQueueContainer (-want, +got) = %v", diff) } @@ -1025,16 +1024,13 @@ var defaultEnv = map[string]string{ "INTERNAL_VOLUME_PATH": internalVolumePath, } -func probeArgs(containerArgs []string, probe *corev1.Probe) []string { - probeArgs := []string{"--readiness-probe", "null"} +func probeJSON(probe *corev1.Probe) string { if probe != nil { - probeBytes, err := json.Marshal(probe) - if err != nil { - return append(containerArgs, probeArgs...) + if probeBytes, err := json.Marshal(probe); err == nil { + return string(probeBytes) } - return append(containerArgs, []string{"--readiness-probe", string(probeBytes)}...) } - return append(containerArgs, probeArgs...) + return "" } func env(overrides map[string]string) []corev1.EnvVar { From 7ab868142dd655099f0a754e66fec6aac0d30324 Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Mon, 15 Jul 2019 20:45:26 -0700 Subject: [PATCH 6/9] Address comments - Use context for timeout - do not override exec probe - simplify the logic for errors when multiple probes are mentioned Signed-off-by: Shash Reddy --- cmd/queue/main.go | 11 +++----- pkg/apis/serving/k8s_validation.go | 25 ++++++++----------- pkg/apis/serving/k8s_validation_test.go | 3 +-- .../revision/resources/deploy_test.go | 2 +- pkg/reconciler/revision/resources/queue.go | 7 +----- 5 files changed, 16 insertions(+), 32 deletions(-) diff --git a/cmd/queue/main.go b/cmd/queue/main.go index 4c7f722c779c..3a7963d4be08 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -247,14 +247,9 @@ func probeQueueHealthPath(port int, timeoutSeconds int) error { }, Timeout: timeoutDuration, } - - stopCh := make(chan struct{}) - timeCh := time.After(timeoutDuration) - - go func() { - <-timeCh - close(stopCh) - }() + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + defer cancel() + stopCh := ctx.Done() var lastErr error timeoutErr := wait.PollImmediateUntil(aggressivePollInterval, func() (bool, error) { diff --git a/pkg/apis/serving/k8s_validation.go b/pkg/apis/serving/k8s_validation.go index ec840e49059e..b55cd9e04e63 100644 --- a/pkg/apis/serving/k8s_validation.go +++ b/pkg/apis/serving/k8s_validation.go @@ -467,30 +467,25 @@ func validateProbe(p *corev1.Probe) *apis.FieldError { h := p.Handler errs = errs.Also(apis.CheckDisallowedFields(h, *HandlerMask(&h))) - numHandlers := 0 + var handlers []string if h.HTTPGet != nil { - numHandlers++ + handlers = append(handlers, "httpGet") errs = errs.Also(apis.CheckDisallowedFields(*h.HTTPGet, *HTTPGetActionMask(h.HTTPGet))).ViaField("httpGet") } if h.TCPSocket != nil { - if numHandlers > 0 { - errs = errs.Also(apis.ErrDisallowedFields("tcpSocket")) - } else { - numHandlers++ - errs = errs.Also(apis.CheckDisallowedFields(*h.TCPSocket, *TCPSocketActionMask(h.TCPSocket))).ViaField("tcpSocket") - } + handlers = append(handlers, "tcpSocket") + errs = errs.Also(apis.CheckDisallowedFields(*h.TCPSocket, *TCPSocketActionMask(h.TCPSocket))).ViaField("tcpSocket") } if h.Exec != nil { - if numHandlers > 0 { - errs = errs.Also(apis.ErrDisallowedFields("exec")) - } else { - numHandlers++ - errs = errs.Also(apis.CheckDisallowedFields(*h.Exec, *ExecActionMask(h.Exec))).ViaField("exec") - } + handlers = append(handlers, "exec") + errs = errs.Also(apis.CheckDisallowedFields(*h.Exec, *ExecActionMask(h.Exec))).ViaField("exec") } - if numHandlers == 0 { + + if len(handlers) == 0 { errs = errs.Also(apis.ErrMissingField("handler")) + } else if len(handlers) > 1 { + errs = errs.Also(apis.ErrMultipleOneOf(handlers...)) } return errs } diff --git a/pkg/apis/serving/k8s_validation_test.go b/pkg/apis/serving/k8s_validation_test.go index 50e27f02b04d..899d78b51ad6 100644 --- a/pkg/apis/serving/k8s_validation_test.go +++ b/pkg/apis/serving/k8s_validation_test.go @@ -576,8 +576,7 @@ func TestContainerValidation(t *testing.T) { }, }, }, - want: apis.ErrDisallowedFields("readinessProbe.exec").Also( - apis.ErrDisallowedFields("readinessProbe.tcpSocket")), + want: apis.ErrMultipleOneOf("readinessProbe.exec", "readinessProbe.tcpSocket", "readinessProbe.httpGet"), }, { name: "invalid readiness http probe (has port)", c: corev1.Container{ diff --git a/pkg/reconciler/revision/resources/deploy_test.go b/pkg/reconciler/revision/resources/deploy_test.go index d0d58761f398..12c08ffc8413 100644 --- a/pkg/reconciler/revision/resources/deploy_test.go +++ b/pkg/reconciler/revision/resources/deploy_test.go @@ -580,7 +580,7 @@ func TestMakePodSpec(t *testing.T) { withExecReadinessProbe([]string{"echo", "hello"})), queueContainer( withEnvVar("CONTAINER_CONCURRENCY", "0"), - withEnvVar("SERVING_READINESS_PROBE", `{"tcpSocket":{"port":8080,"host":"127.0.0.1"},"successThreshold":1}`), + withEnvVar("SERVING_READINESS_PROBE", "{}"), ), }), }, { diff --git a/pkg/reconciler/revision/resources/queue.go b/pkg/reconciler/revision/resources/queue.go index f0af4f553b27..26135e322400 100644 --- a/pkg/reconciler/revision/resources/queue.go +++ b/pkg/reconciler/revision/resources/queue.go @@ -336,12 +336,7 @@ func applyReadinessProbeDefaults(p *corev1.Probe, port int32) { p.TCPSocket.Host = localAddress p.TCPSocket.Port = intstr.FromInt(int(port)) case p.Exec != nil: - // Use default TCP connect probe to ensure data path is open from queue-proxy to - // user-container. User-defined ExecProbe will still be run on user-container. - p.TCPSocket = &corev1.TCPSocketAction{} - p.SuccessThreshold = 1 - p.TCPSocket.Host = localAddress - p.TCPSocket.Port = intstr.FromInt(int(port)) + //User-defined ExecProbe will still be run on user-container. p.Exec = nil } From dc9663d9f132c04f74aab25be7f86099a0aa9eaf Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Tue, 16 Jul 2019 09:48:56 -0400 Subject: [PATCH 7/9] use url.Parse in queue test Co-authored-by: Shash Reddy --- cmd/queue/main_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cmd/queue/main_test.go b/cmd/queue/main_test.go index b9ae21df7372..0cb0111f6928 100644 --- a/cmd/queue/main_test.go +++ b/cmd/queue/main_test.go @@ -244,11 +244,14 @@ func TestProbeQueueTimeout(t *testing.T) { defer ts.Close() - portStr := strings.TrimPrefix(ts.URL, "http://127.0.0.1:") + u, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("%s is not a valid URL: %v", ts.URL, err) + } - port, err := strconv.Atoi(portStr) + port, err := strconv.Atoi(u.Port()) if err != nil { - t.Fatalf("failed to convert port(%s) to int", portStr) + t.Fatalf("failed to convert port(%s) to int", u.Port()) } timeout := 1 From 63295fe02f1ac49d91097e6c3f8654c860cf5023 Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Tue, 16 Jul 2019 10:16:14 -0400 Subject: [PATCH 8/9] use 'probe-period' as flag in queue binary Co-authored-by: Shash Reddy --- cmd/queue/main.go | 2 +- pkg/reconciler/revision/resources/deploy_test.go | 2 +- pkg/reconciler/revision/resources/queue.go | 4 ++-- pkg/reconciler/revision/resources/queue_test.go | 12 ++++++------ 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cmd/queue/main.go b/cmd/queue/main.go index 3a7963d4be08..669eb81cd2c6 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -114,7 +114,7 @@ var ( appResponseTimeInMsecN, "The response time in millisecond", stats.UnitMilliseconds) - readinessProbeTimeout = flag.Int("probe", -1, "run readiness probe with given timeout") + readinessProbeTimeout = flag.Int("probe-period", -1, "run readiness probe with given timeout") ) type config struct { diff --git a/pkg/reconciler/revision/resources/deploy_test.go b/pkg/reconciler/revision/resources/deploy_test.go index 12c08ffc8413..2569e29e4666 100644 --- a/pkg/reconciler/revision/resources/deploy_test.go +++ b/pkg/reconciler/revision/resources/deploy_test.go @@ -75,7 +75,7 @@ var ( ReadinessProbe: &corev1.Probe{ Handler: corev1.Handler{ Exec: &corev1.ExecAction{ - Command: []string{"/ko-app/queue", "-probe", "0"}, + Command: []string{"/ko-app/queue", "-probe-period", "0"}, }, }, PeriodSeconds: 1, diff --git a/pkg/reconciler/revision/resources/queue.go b/pkg/reconciler/revision/resources/queue.go index 26135e322400..27dfbbae33ea 100644 --- a/pkg/reconciler/revision/resources/queue.go +++ b/pkg/reconciler/revision/resources/queue.go @@ -151,7 +151,7 @@ func makeQueueProbe(in *corev1.Probe) *corev1.Probe { out := &corev1.Probe{ Handler: corev1.Handler{ Exec: &corev1.ExecAction{ - Command: []string{"/ko-app/queue", "-probe", "0"}, + Command: []string{"/ko-app/queue", "-probe-period", "0"}, }, }, // We want to mark the service as not ready as soon as the @@ -180,7 +180,7 @@ func makeQueueProbe(in *corev1.Probe) *corev1.Probe { return &corev1.Probe{ Handler: corev1.Handler{ Exec: &corev1.ExecAction{ - Command: []string{"/ko-app/queue", "-probe", strconv.Itoa(timeout)}, + Command: []string{"/ko-app/queue", "-probe-period", strconv.Itoa(timeout)}, }, }, PeriodSeconds: in.PeriodSeconds, diff --git a/pkg/reconciler/revision/resources/queue_test.go b/pkg/reconciler/revision/resources/queue_test.go index 7f0b7d77af90..5e9c8aa02598 100644 --- a/pkg/reconciler/revision/resources/queue_test.go +++ b/pkg/reconciler/revision/resources/queue_test.go @@ -50,7 +50,7 @@ import ( var defaultKnativeQReadinessProbe = &corev1.Probe{ Handler: corev1.Handler{ Exec: &corev1.ExecAction{ - Command: []string{"/ko-app/queue", "-probe", "0"}, + Command: []string{"/ko-app/queue", "-probe-period", "0"}, }, }, // We want to mark the service as not ready as soon as the @@ -705,7 +705,7 @@ func TestProbeGenerationHTTPDefaults(t *testing.T) { ReadinessProbe: &corev1.Probe{ Handler: corev1.Handler{ Exec: &corev1.ExecAction{ - Command: []string{"/ko-app/queue", "-probe", "10"}, + Command: []string{"/ko-app/queue", "-probe-period", "10"}, }, }, PeriodSeconds: 1, @@ -790,7 +790,7 @@ func TestProbeGenerationHTTP(t *testing.T) { ReadinessProbe: &corev1.Probe{ Handler: corev1.Handler{ Exec: &corev1.ExecAction{ - Command: []string{"/ko-app/queue", "-probe", "10"}, + Command: []string{"/ko-app/queue", "-probe-period", "10"}, }, }, PeriodSeconds: 2, @@ -856,7 +856,7 @@ func TestTCPProbeGeneration(t *testing.T) { ReadinessProbe: &corev1.Probe{ Handler: corev1.Handler{ Exec: &corev1.ExecAction{ - Command: []string{"/ko-app/queue", "-probe", "0"}, + Command: []string{"/ko-app/queue", "-probe-period", "0"}, }, }, PeriodSeconds: 1, @@ -903,7 +903,7 @@ func TestTCPProbeGeneration(t *testing.T) { ReadinessProbe: &corev1.Probe{ Handler: corev1.Handler{ Exec: &corev1.ExecAction{ - Command: []string{"/ko-app/queue", "-probe", "1"}, + Command: []string{"/ko-app/queue", "-probe-period", "1"}, }, }, PeriodSeconds: 1, @@ -960,7 +960,7 @@ func TestTCPProbeGeneration(t *testing.T) { ReadinessProbe: &corev1.Probe{ Handler: corev1.Handler{ Exec: &corev1.ExecAction{ - Command: []string{"/ko-app/queue", "-probe", "15"}, + Command: []string{"/ko-app/queue", "-probe-period", "15"}, }, }, PeriodSeconds: 2, From 8eaf076289814fa65351b77943b5af76094524e6 Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Tue, 16 Jul 2019 12:14:47 -0700 Subject: [PATCH 9/9] Add comment for using pollImmediateUntil instead of pollImmediate Signed-off-by: Shash Reddy --- cmd/queue/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/queue/main.go b/cmd/queue/main.go index 669eb81cd2c6..06e5c5a75221 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -252,6 +252,8 @@ func probeQueueHealthPath(port int, timeoutSeconds int) error { stopCh := ctx.Done() var lastErr error + // Using PollImmediateUntil instead of PollImmediate because if timeout is reached while waiting for first + // invocation of conditionFunc, it exits immediately without trying for a second time. timeoutErr := wait.PollImmediateUntil(aggressivePollInterval, func() (bool, error) { var res *http.Response if res, lastErr = httpClient.Get(url); res == nil {