Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 52 additions & 60 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ import (

"github.com/kelseyhightower/envconfig"
"github.com/pkg/errors"
"go.opencensus.io/stats"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/pkg/logging/logkey"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"
"knative.dev/serving/pkg/activator"
activatorutil "knative.dev/serving/pkg/activator/util"
"knative.dev/serving/pkg/apis/networking"
Expand All @@ -40,16 +46,8 @@ import (
"knative.dev/serving/pkg/network"
"knative.dev/serving/pkg/queue"
"knative.dev/serving/pkg/queue/health"
"knative.dev/serving/pkg/queue/readiness"
queuestats "knative.dev/serving/pkg/queue/stats"

"go.opencensus.io/stats"
"go.uber.org/zap"

"knative.dev/pkg/logging/logkey"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"

"k8s.io/apimachinery/pkg/util/wait"
)

const (
Expand All @@ -67,10 +65,6 @@ const (
// in the mesh.
quitSleepDuration = 20 * time.Second

// Set equal to the queue-proxy's ExecProbe timeout to take
// advantage of the full window
probeTimeout = 10 * time.Second

badProbeTemplate = "unexpected probe header value: %s"

// Metrics' names (without component prefix).
Expand All @@ -84,6 +78,11 @@ const (
requestQueueHealthPath = "/health"

healthURLTemplate = "http://127.0.0.1:%d" + requestQueueHealthPath
tcpProbeTimeout = 100 * time.Millisecond
// The 25 millisecond retry interval is an unscientific compromise between wanting to get
// started as early as possible while still wanting to give the container some breathing
// room to get up and running.
aggressivePollInterval = 25 * time.Millisecond
)

var (
Expand All @@ -98,8 +97,6 @@ var (
healthState = &health.State{}
promStatReporter *queue.PrometheusStatsReporter // Prometheus stats reporter.

probe = flag.Bool("probe", false, "run readiness probe")

// Metric counters.
requestCountM = stats.Int64(
requestCountN,
Expand All @@ -117,6 +114,7 @@ var (
appResponseTimeInMsecN,
"The response time in millisecond",
stats.UnitMilliseconds)
readinessProbeTimeout = flag.Int("probe-period", -1, "run readiness probe with given timeout")
)

type config struct {
Expand All @@ -138,6 +136,7 @@ type config struct {
ServingLoggingLevel string `split_words:"true" required:"true"`
ServingRequestMetricsBackend string `split_words:"true" required:"true"`
ServingRequestLogTemplate string `split_words:"true" required:"true"`
ServingReadinessProbe string `split_words:"true" required:"true"`
}

func initConfig(env config) {
Expand Down Expand Up @@ -174,29 +173,8 @@ func knativeProxyHeader(r *http.Request) string {
return r.Header.Get(network.ProxyHeaderName)
}

func probeUserContainer() bool {
var err error
wait.PollImmediate(50*time.Millisecond, probeTimeout, func() (bool, error) {
logger.Debug("TCP probing the user-container.")
config := health.TCPProbeConfigOptions{
Address: userTargetAddress,
SocketTimeout: 100 * time.Millisecond,
}
err = health.TCPProbe(config)
return err == nil, nil
})

if err == nil {
logger.Info("User-container successfully probed.")
} else {
logger.Errorw("User-container could not be probed successfully.", zap.Error(err))
}

return err == nil
}

// Make handler a closure for testing.
func handler(reqChan chan queue.ReqEvent, breaker *queue.Breaker, handler http.Handler) func(http.ResponseWriter, *http.Request) {
func handler(reqChan chan queue.ReqEvent, breaker *queue.Breaker, handler http.Handler, prober func() bool) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ph := knativeProbeHeader(r)
switch {
Expand All @@ -205,12 +183,17 @@ func handler(reqChan chan queue.ReqEvent, breaker *queue.Breaker, handler http.H
http.Error(w, fmt.Sprintf(badProbeTemplate, ph), http.StatusBadRequest)
return
}
if probeUserContainer() {
// Respond with the name of the component handling the request.
w.Write([]byte(queue.Name))
if prober != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in a separate PR: Is there a reason why we don't return the state from healthState here? Seems unnecessarily redundant to probe on this path 🤔

@greghaynes do you need that for your "direct to ip" work?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like an excellent suggestion. 👍

if prober() {
// Respond with the name of the component handling the request.
w.Write([]byte(queue.Name))
} else {
http.Error(w, "container not ready", http.StatusServiceUnavailable)
}
} else {
http.Error(w, "container not ready", http.StatusServiceUnavailable)
http.Error(w, "no probe", http.StatusInternalServerError)
}

return
case network.IsKubeletProbe(r):
// Do not count health checks for concurrency metrics
Expand Down Expand Up @@ -243,40 +226,42 @@ func handler(reqChan chan queue.ReqEvent, breaker *queue.Breaker, handler http.H
}

// Sets up /health and /wait-for-drain endpoints.
func createAdminHandlers() *http.ServeMux {
func createAdminHandlers(p *readiness.Probe) *http.ServeMux {
mux := http.NewServeMux()
// TODO(@joshrider): temporary change while waiting on other PRs to merge (See #4014)
mux.HandleFunc(requestQueueHealthPath, healthState.HealthHandler(probeUserContainer, true /*isNotAggressive*/))

mux.HandleFunc(requestQueueHealthPath, healthState.HealthHandler(p.ProbeContainer, p.IsAggressive()))
mux.HandleFunc(queue.RequestQueueDrainPath, healthState.DrainHandler())

return mux
}

func probeQueueHealthPath(port int, timeout time.Duration) error {
func probeQueueHealthPath(port int, timeoutSeconds int) error {
url := fmt.Sprintf(healthURLTemplate, port)

timeoutDuration := readiness.PollTimeout
if timeoutSeconds != 0 {
timeoutDuration = time.Duration(timeoutSeconds) * time.Second
}
httpClient := &http.Client{
Transport: &http.Transport{
// Do not use the cached connection
DisableKeepAlives: true,
Comment thread
mattmoor marked this conversation as resolved.
Outdated
},
Timeout: timeout,
Timeout: timeoutDuration,
}
ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
Comment thread
mattmoor marked this conversation as resolved.
defer cancel()
stopCh := ctx.Done()

var lastErr error

// The 25 millisecond retry interval is an unscientific compromise between wanting to get
// started as early as possible while still wanting to give the container some breathing
// room to get up and running.
timeoutErr := wait.PollImmediate(25*time.Millisecond, timeout, func() (bool, error) {
// Using PollImmediateUntil instead of PollImmediate because if timeout is reached while waiting for first
// invocation of conditionFunc, it exits immediately without trying for a second time.
timeoutErr := wait.PollImmediateUntil(aggressivePollInterval, func() (bool, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I know Matt suggested it I liked the previous version more, it's shorter :)
🤷‍♀

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What'd I suggest?

var res *http.Response
if res, lastErr = httpClient.Get(url); res == nil {
return false, nil
}
defer res.Body.Close()

return res.StatusCode == http.StatusOK, nil
})
return health.IsHTTPProbeReady(res), nil
}, stopCh)

if lastErr != nil {
return errors.Wrap(lastErr, "failed to probe")
Expand All @@ -293,8 +278,8 @@ func probeQueueHealthPath(port int, timeout time.Duration) error {
func main() {
flag.Parse()

if *probe {
if err := probeQueueHealthPath(networking.QueueAdminPort, probeTimeout); err != nil {
if *readinessProbeTimeout >= 0 {
if err := probeQueueHealthPath(networking.QueueAdminPort, *readinessProbeTimeout); err != nil {
// used instead of the logger to produce a concise event message
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
Expand Down Expand Up @@ -356,9 +341,16 @@ func main() {
StatChan: statChan,
}, time.Now())

coreProbe, err := readiness.DecodeProbe(env.ServingReadinessProbe)
if err != nil {
logger.Fatalw("Queue container failed to parse readiness probe", zap.Error(err))
}

rp := readiness.NewProbe(coreProbe, logger.With(zap.String(logkey.Key, "readinessProbe")))

adminServer := &http.Server{
Addr: ":" + strconv.Itoa(networking.QueueAdminPort),
Handler: createAdminHandlers(),
Handler: createAdminHandlers(rp),
}

metricsSupported := false
Expand All @@ -379,7 +371,7 @@ func main() {
if metricsSupported {
composedHandler = pushRequestMetricHandler(httpProxy, appRequestCountM, appResponseTimeInMsecM, env)
}
composedHandler = http.HandlerFunc(handler(reqChan, breaker, composedHandler))
composedHandler = http.HandlerFunc(handler(reqChan, breaker, composedHandler, rp.ProbeContainer))
composedHandler = queue.ForwardedShimHandler(composedHandler)
composedHandler = queue.TimeToFirstByteTimeoutHandler(composedHandler,
time.Duration(env.RevisionTimeoutSeconds)*time.Second, "request timeout")
Expand Down
Loading