Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ limitations under the License.
package main

import (
"bytes"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"time"

"bytes"
"io"
"io/ioutil"
"net/http/httputil"
"net/url"
"time"

"github.com/knative/serving/pkg/logging/logkey"

Expand Down Expand Up @@ -135,14 +136,26 @@ func (a *activationHandler) handler(w http.ResponseWriter, r *http.Request) {
http.Error(w, msg, int(status))
return
}

var transport http.RoundTripper
if endpoint.IsVerified() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably still worth using the retryRoundTripper for HTTP GETs

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear as to why the retry approach is better when the readinessProbe is defined by the user. The readinessProbe is a clear indicator of when the app is ready to receive requests.

Copy link
Copy Markdown
Member

@dprotaso dprotaso Jul 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For GETs a request could still fail for whatever reason even if readiness probe succeeds

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the user's application says it's ready, it will be put into service. That's true whether it's the first pod or the 100th. The only reason we're retrying at this level (in the activator) is because we know the network programming is eventually consistent, which matters more for the first pod.

Once we've verified we can reach the service with readiness probing, we should just forward the request. If it fails because of something in the user's application, we should just rely on a higher level retry (or not).

transport = http.DefaultTransport
if r.ProtoMajor == 2 {
transport = h2cutil.NewTransport()
}
} else {
transport = retryRoundTripper{
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR is meant to address not retrying HTTP POSTs it's still possible here if the endpoint isn't verified.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR comment refers to this fact. This issue is being resolved with a multi PR approach

This initial PR avoids HTTP retries by activator when the user has defined an HTTPGet readinessProbe. The follow up PR will default to queue-proxy health check when user has not defined a readinessProbe

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify my comment if the endpoint VerifiedStatus is Failed it will go into the else block and use the retryTripper for HTTP POSTs

logger: a.logger,
}
}

target := &url.URL{
Scheme: "http",
Host: fmt.Sprintf("%s:%d", endpoint.FQDN, endpoint.Port),
}

proxy := httputil.NewSingleHostReverseProxy(target)
proxy.Transport = retryRoundTripper{
logger: a.logger,
}
proxy.Transport = transport

// TODO: Clear the host to avoid 404's.
// https://github.com/knative/serving/issues/964
Expand Down
17 changes: 15 additions & 2 deletions pkg/activator/activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ type revisionID struct {

// Endpoint is a fully-qualified domain name / port pair for an active revision.
type Endpoint struct {
FQDN string
Port int32
FQDN string
Port int32
Verified VerificationStatus
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified reads like a state. I'd just call this Status

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. Will change

}

type VerificationStatus string

const (
Unknown VerificationStatus = "Unknown"
Pass VerificationStatus = "Pass"
Fail VerificationStatus = "Fail"
)

func (e *Endpoint) IsVerified() bool {
return e.Verified == Pass
}
14 changes: 7 additions & 7 deletions pkg/activator/dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

func TestSingleRevision_SingleRequest_Success(t *testing.T) {
want := Endpoint{"ip", 8080}
want := Endpoint{FQDN: "ip", Port: 8080}
f := newFakeActivator(t,
map[revisionID]activationResult{
revisionID{"default", "rev1"}: activationResult{
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestSingleRevision_SingleRequest_Success(t *testing.T) {
}

func TestSingleRevision_MultipleRequests_Success(t *testing.T) {
ep := Endpoint{"ip", 8080}
ep := Endpoint{FQDN: "ip", Port: 8080}
f := newFakeActivator(t,
map[revisionID]activationResult{
revisionID{"default", "rev1"}: activationResult{
Expand Down Expand Up @@ -82,8 +82,8 @@ func TestSingleRevision_MultipleRequests_Success(t *testing.T) {
}

func TestMultipleRevisions_MultipleRequests_Success(t *testing.T) {
ep1 := Endpoint{"ip1", 8080}
ep2 := Endpoint{"ip2", 8080}
ep1 := Endpoint{FQDN: "ip1", Port: 8080}
ep2 := Endpoint{FQDN: "ip2", Port: 8080}
f := newFakeActivator(t,
map[revisionID]activationResult{
revisionID{"default", "rev1"}: activationResult{
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestMultipleRevisions_MultipleRequests_Success(t *testing.T) {
}

func TestMultipleRevisions_MultipleRequests_PartialSuccess(t *testing.T) {
ep1 := Endpoint{"ip1", 8080}
ep1 := Endpoint{FQDN: "ip1", Port: 8080}
status2 := Status(http.StatusInternalServerError)
error2 := fmt.Errorf("test error")
f := newFakeActivator(t,
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestSingleRevision_MultipleRequests_FailureRecovery(t *testing.T) {
}

// Later activation succeeds
successEp := Endpoint{"ip", 8080}
successEp := Endpoint{FQDN: "ip", Port: 8080}
successStatus := Status(0)
f.responses[revisionID{"default", "rev1"}] = activationResult{
endpoint: successEp,
Expand All @@ -216,7 +216,7 @@ func TestSingleRevision_MultipleRequests_FailureRecovery(t *testing.T) {
}

func TestShutdown_ReturnError(t *testing.T) {
ep := Endpoint{"ip", 8080}
ep := Endpoint{FQDN: "ip", Port: 8080}
f := newFakeActivator(t,
map[revisionID]activationResult{
revisionID{"default", "rev1"}: activationResult{
Expand Down
123 changes: 123 additions & 0 deletions pkg/activator/prober.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
Copyright 2018 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package activator

import (
"errors"
"fmt"
"net/http"
"net/url"
"time"

"github.com/knative/serving/pkg/apis/serving/v1alpha1"
"go.uber.org/zap"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

const (
maxRetry = 60
defaultPeriodSeconds = int32(1 * time.Second)
defaultTimeoutSeconds = int32(1 * time.Second)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

func verifyRevisionRoutability(revision *v1alpha1.Revision, endpoint *Endpoint, logger *zap.SugaredLogger) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear that you're using a reference to an Endpoint in order to mutate it. It might be better to not mutate the value and return a VerificationStatus. Then set the endpoint's type to be a non-reference. ie. endpoint Endpoint so you're not seeing &endpoint everywhere.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, could change the function name to verifyEndpointStatus, which would make it more obvious :) But, yeah do agree with your suggestion. Will alter.

// Proceed only if user has a HTTPGet readinessProbe defined
if revision.Spec.Container.ReadinessProbe == nil ||
revision.Spec.Container.ReadinessProbe.HTTPGet == nil {
endpoint.Verified = Unknown
return
}

endpoint.Verified = Fail
probe := createHttpGetProbe(revision, *endpoint)

// Number of seconds after the readiness probes are initiated
time.Sleep(time.Second * int32ToDuration(probe.InitialDelaySeconds))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We essentially know the endpoint is ready, but not necessarily from all nodes. I'm thinking we can skip the initial delay and hope that it'll optimistically work.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did have some hesitation in using it. But the thinking was that a user directive to apply an initial delay should be respected. The default initial delay is 0 seconds.

As an example, assume for an app, the actual initial delay is longer than 60 seconds; then the endpoint verification would fail if the default retry interval of 1 second and max retry limit of 60 is used.


retryCount := 1
retryInterval := time.Second * int32ToDuration(probe.PeriodSeconds)

for retryCount = 1; retryCount < maxRetry; retryCount++ {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to go with an exponential backoff. E.g. #1814. Let's see if you can reuse @markusthoemmes' retry mechanism. Or if his mechanism can be modified for you to use it.

ready, err := checkHttpGetProbe(probe, logger)
if err != nil {
logger.Errorf("error checking probe", zap.String("error", err.Error()))
}

if ready {
endpoint.Verified = Pass
break
}

// How often (in seconds) to perform the probe
time.Sleep(retryInterval)
}

logger.Infof("took %d probe retries for readiness", retryCount, zap.Any("endpoint", endpoint))
}

// Function creates HTTP readiness probe for revision
func createHttpGetProbe(revision *v1alpha1.Revision, endpoint Endpoint) *v1.Probe {
Copy link
Copy Markdown
Member

@dprotaso dprotaso Jul 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're transforming a (revision, endpoint) to a probe. Then in the subsequent steps you're converting the probe to an http get call/request.

You can go straight to a (revision, endpoint) -> http.Request/call

Copy link
Copy Markdown
Contributor Author

@sukhil-suresh sukhil-suresh Jul 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. This approach is a remnant of the initial effort to tackle both HttpGet and TCPSocket based readiness probes. Support for TCPSocket probe is currently blocked by issue #1241. More details in this issue comment.

Will update.

probe := revision.Spec.Container.ReadinessProbe.DeepCopy()
probe.HTTPGet.Scheme = "http"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're making assumptions on the scheme - just confirm it's always http

You can use the defaulter function to set the default value for this and the path
https://github.com/kubernetes/kubernetes/blob/release-1.11/pkg/apis/core/v1/defaults.go#L294

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will make the change. I did consider using the defaulter functions from the kubernetes library. Can't remember why I backed out of it :-/

probe.HTTPGet.Host = endpoint.FQDN
probe.HTTPGet.Port.Type = intstr.Int
probe.HTTPGet.Port.IntVal = endpoint.Port

if probe.TimeoutSeconds == 0 {
probe.TimeoutSeconds = defaultTimeoutSeconds
}
if probe.PeriodSeconds == 0 {
probe.PeriodSeconds = defaultPeriodSeconds
}

return probe
}

func checkHttpGetProbe(probe *v1.Probe, logger *zap.SugaredLogger) (ready bool, err error) {
if probe == nil {
return false, errors.New("probe cannot be nil")
}

if probe.HTTPGet == nil {
return false, errors.New("probe HTTPGet cannot be nil")
}

host := fmt.Sprintf("%s:%d", probe.HTTPGet.Host, probe.HTTPGet.Port.IntVal)
if err != nil {
return false, err
}

probeUrl := url.URL{
Scheme: string(probe.HTTPGet.Scheme),
Host: host,
Path: probe.HTTPGet.Path,
}

logger.Debug("checking probe url: %s", probeUrl.String())

client := http.Client{Timeout: time.Second * int32ToDuration(probe.TimeoutSeconds)}
res, err := client.Get(probeUrl.String())
if err != nil {
return false, err
}

return res.StatusCode == http.StatusOK, nil
}

func int32ToDuration(i int32) time.Duration {
return time.Duration(int64(i))
}
Loading