Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"net/http/httputil"
"net/url"

"github.com/golang/glog"
"github.com/knative/serving/pkg/activator"
clientset "github.com/knative/serving/pkg/client/clientset/versioned"
"github.com/knative/serving/pkg/controller"
"github.com/knative/serving/pkg/signals"
"github.com/golang/glog"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
Expand All @@ -44,12 +44,13 @@ func (a *activationHandler) handler(w http.ResponseWriter, r *http.Request) {
return
}
target := &url.URL{
// TODO: Wire Activator into Istio mesh to support TLS
// (https://github.com/knative/serving/issues/838)
Scheme: "http",
Host: fmt.Sprintf("%s:%d", endpoint.IP, endpoint.Port),
Host: fmt.Sprintf("%s:%d", endpoint.FQDN, endpoint.Port),
}
proxy := httputil.NewSingleHostReverseProxy(target)
// TODO: Clear the host to avoid 404's.
// https://github.com/elafros/elafros/issues/964
r.Host = ""
proxy.ServeHTTP(w, r)
}

Expand Down
2 changes: 2 additions & 0 deletions config/100-namespace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ apiVersion: v1
kind: Namespace
metadata:
name: knative-serving-system
labels:
istio-injection: enabled
Copy link
Copy Markdown

@ZhiminXiang ZhiminXiang Jun 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By adding this label, seems like Istio sidecar will be injected into all of pods under knative-serving-system (including controller, webhook, etc.) according to my experiment. Is this what we want in this PR?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not. I'll work on it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, looks like you enable sidecar injection for an entire namespace https://istio.io/docs/setup/kubernetes/sidecar-injection/#deploying-an-app

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I might need to disable it on a case by case basis.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There, that should do it.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea. or maybe we can move activator and autoscaler into a separate namespace? Just a thought :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave that for when we make the autoscaler multitenant

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://istio.io/docs/setup/kubernetes/sidecar-injection/#policy suggests that it should be possible to have this governed by an annotation, but I haven't been able to make this work with experimentation.

2 changes: 2 additions & 0 deletions config/activator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ spec:
replicas: 1
template:
metadata:
annotations:
sidecar.istio.io/inject: "true"
labels:
app: activator
role: activator
Expand Down
2 changes: 2 additions & 0 deletions config/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ spec:
replicas: 1
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
labels:
app: controller
spec:
Expand Down
2 changes: 2 additions & 0 deletions config/webhook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ spec:
replicas: 1
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
labels:
app: webhook
role: webhook
Expand Down
4 changes: 2 additions & 2 deletions pkg/activator/activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type revisionID struct {
name string
}

// Endpoint is an ip, port pair for an active revision.
// Endpoint is a fully-qualified domain name / port pair for an active revision.
type Endpoint struct {
IP string
FQDN string
Port int32
}
51 changes: 26 additions & 25 deletions pkg/activator/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var _ Activator = (*revisionActivator)(nil)
type revisionActivator struct {
readyTimout time.Duration // for testing
kubeClient kubernetes.Interface
elaClient clientset.Interface
knaClient clientset.Interface
}

// NewRevisionActivator creates an Activator that changes revision
Expand All @@ -43,7 +43,7 @@ func NewRevisionActivator(kubeClient kubernetes.Interface, elaClient clientset.I
return &revisionActivator{
readyTimout: 60 * time.Second,
kubeClient: kubeClient,
elaClient: elaClient,
knaClient: elaClient,
}
}

Expand All @@ -60,7 +60,7 @@ func (r *revisionActivator) ActiveEndpoint(namespace, name string) (end Endpoint
}

// Get the current revision serving state
revisionClient := r.elaClient.ServingV1alpha1().Revisions(rev.namespace)
revisionClient := r.knaClient.ServingV1alpha1().Revisions(rev.namespace)
revision, err := revisionClient.Get(rev.name, metav1.GetOptions{})
if err != nil {
return internalError("Unable to get revision %s/%s: %v", rev.namespace, rev.name, err)
Expand All @@ -84,7 +84,7 @@ func (r *revisionActivator) ActiveEndpoint(namespace, name string) (end Endpoint

// Wait for the revision to be ready
if !revision.Status.IsReady() {
wi, err := r.elaClient.ServingV1alpha1().Revisions(rev.namespace).Watch(metav1.ListOptions{
wi, err := r.knaClient.ServingV1alpha1().Revisions(rev.namespace).Watch(metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", rev.name),
})
if err != nil {
Expand All @@ -102,7 +102,16 @@ func (r *revisionActivator) ActiveEndpoint(namespace, name string) (end Endpoint
if !revision.Status.IsReady() {
log.Printf("Revision %s/%s is not yet ready", rev.namespace, rev.name)
continue
} else {
log.Printf("Revision %s/%s is ready", rev.namespace, rev.name)
}
// After a pod goes ready, there is a poll loop to publish that fact, then there are
// controllers that wake up to propagate the info to each node which configures iptables on
// a max frequency loop, so it's always possible that there's a small delay.
// The delay should be O(seconds) max, most of the time.
// TODO: rely on readinessProbe instead of a hard-coded sleep.
// https://github.com/elafros/elafros/issues/974
time.Sleep(2 * time.Second)
break RevisionReady
} else {
return internalError("Unexpected result type for revision %s/%s: %v", rev.namespace, rev.name, event)
Expand All @@ -112,33 +121,25 @@ func (r *revisionActivator) ActiveEndpoint(namespace, name string) (end Endpoint
}

// Get the revision endpoint
//
// TODO: figure out why do we need to use the pod IP directly to avoid the delay.
// We should be able to use the k8s service cluster IP.
// https://github.com/knative/serving/issues/660
endpointName := controller.GetElaK8SServiceNameForRevision(revision)
k8sEndpoint, err := r.kubeClient.CoreV1().Endpoints(rev.namespace).Get(endpointName, metav1.GetOptions{})
services := r.kubeClient.CoreV1().Services(revision.GetNamespace())
serviceName := controller.GetElaK8SServiceNameForRevision(revision)
svc, err := services.Get(serviceName, metav1.GetOptions{})
if err != nil {
return internalError("Unable to get endpoint %s for revision %s/%s: %v",
endpointName, rev.namespace, rev.name, err)
return internalError("Unable to get service %s for revision %s/%s: %v",
serviceName, rev.namespace, rev.name, err)
}
if len(k8sEndpoint.Subsets) != 1 {
return internalError("Revision %s/%s needs one endpoint subset. Found %v", rev.namespace, rev.name, len(k8sEndpoint.Subsets))
}
subset := k8sEndpoint.Subsets[0]
if len(subset.Addresses) != 1 || len(subset.Ports) != 1 {
return internalError("Revision %s/%s needs one endpoint address and port. Found %v addresses and %v ports",
rev.namespace, rev.name, len(subset.Addresses), len(subset.Ports))
}
ip := subset.Addresses[0].IP
port := subset.Ports[0].Port
if ip == "" || port == 0 {
return internalError("Invalid ip %q or port %q for revision %s/%s", ip, port, rev.namespace, rev.name)

// TODO: in the future, the target service could have more than one port.
// https://github.com/elafros/elafros/issues/837
if len(svc.Spec.Ports) != 1 {
return internalError("Revision %s/%s needs one port. Found %v", rev.namespace, rev.name, len(svc.Spec.Ports))
}
fqdn := fmt.Sprintf("%s.%s.svc.cluster.local", serviceName, revision.Namespace)
port := svc.Spec.Ports[0].Port

// Return the endpoint and active=true
end = Endpoint{
IP: ip,
FQDN: fqdn,
Port: port,
}
return end, 0, nil
Expand Down
103 changes: 49 additions & 54 deletions pkg/activator/revision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,29 @@ import (

"github.com/knative/serving/pkg/apis/serving/v1alpha1"
clientset "github.com/knative/serving/pkg/client/clientset/versioned"
fakeEla "github.com/knative/serving/pkg/client/clientset/versioned/fake"
fakeKna "github.com/knative/serving/pkg/client/clientset/versioned/fake"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
fakeK8s "k8s.io/client-go/kubernetes/fake"
)

const (
testNamespace = "test-namespace"
testRevision = "test-rev"
testNamespace = "test-namespace"
testRevision = "test-rev"
testService = testRevision + "-service"
testServiceFQDN = testService + "." + testNamespace + ".svc.cluster.local"
)

func TestActiveEndpoint_Active_StaysActive(t *testing.T) {
k8s, ela := fakeClients()
ela.ServingV1alpha1().Revisions(testNamespace).Create(newRevisionBuilder().build())
k8s.CoreV1().Endpoints(testNamespace).Create(newEndpointBuilder().build())
a := NewRevisionActivator(k8s, ela)
k8s, kna := fakeClients()
kna.ServingV1alpha1().Revisions(testNamespace).Create(newRevisionBuilder().build())
k8s.CoreV1().Services(testNamespace).Create(newServiceBuilder().build())
a := NewRevisionActivator(k8s, kna)

got, status, err := a.ActiveEndpoint(testNamespace, testRevision)

want := Endpoint{"ip", 8080}
want := Endpoint{testServiceFQDN, 8080}
if got != want {
t.Errorf("Wrong endpoint. Want %+v. Got %+v.", want, got)
}
Expand All @@ -55,17 +57,17 @@ func TestActiveEndpoint_Active_StaysActive(t *testing.T) {
}

func TestActiveEndpoint_Reserve_BecomesActive(t *testing.T) {
k8s, ela := fakeClients()
ela.ServingV1alpha1().Revisions(testNamespace).Create(
k8s, kna := fakeClients()
kna.ServingV1alpha1().Revisions(testNamespace).Create(
newRevisionBuilder().
withServingState(v1alpha1.RevisionServingStateReserve).
build())
k8s.CoreV1().Endpoints(testNamespace).Create(newEndpointBuilder().build())
a := NewRevisionActivator(k8s, ela)
k8s.CoreV1().Services(testNamespace).Create(newServiceBuilder().build())
a := NewRevisionActivator(k8s, kna)

got, status, err := a.ActiveEndpoint(testNamespace, testRevision)

want := Endpoint{"ip", 8080}
want := Endpoint{testServiceFQDN, 8080}
if got != want {
t.Errorf("Wrong endpoint. Want %+v. Got %+v.", want, got)
}
Expand All @@ -76,20 +78,20 @@ func TestActiveEndpoint_Reserve_BecomesActive(t *testing.T) {
t.Errorf("Unexpected error. Want nil. Got %v.", err)
}

rev, _ := ela.ServingV1alpha1().Revisions(testNamespace).Get(testRevision, metav1.GetOptions{})
rev, _ := kna.ServingV1alpha1().Revisions(testNamespace).Get(testRevision, metav1.GetOptions{})
if rev.Spec.ServingState != v1alpha1.RevisionServingStateActive {
t.Errorf("Unexpected serving state. Want Active. Got %v.", rev.Spec.ServingState)
}
}

func TestActiveEndpoint_Retired_StaysRetiredWithError(t *testing.T) {
k8s, ela := fakeClients()
ela.ServingV1alpha1().Revisions(testNamespace).Create(
k8s, kna := fakeClients()
kna.ServingV1alpha1().Revisions(testNamespace).Create(
newRevisionBuilder().
withServingState(v1alpha1.RevisionServingStateRetired).
build())
k8s.CoreV1().Endpoints(testNamespace).Create(newEndpointBuilder().build())
a := NewRevisionActivator(k8s, ela)
k8s.CoreV1().Services(testNamespace).Create(newServiceBuilder().build())
a := NewRevisionActivator(k8s, kna)

got, status, err := a.ActiveEndpoint(testNamespace, testRevision)

Expand All @@ -104,21 +106,21 @@ func TestActiveEndpoint_Retired_StaysRetiredWithError(t *testing.T) {
t.Errorf("Expected error. Want error. Got nil.")
}

rev, _ := ela.ServingV1alpha1().Revisions(testNamespace).Get(testRevision, metav1.GetOptions{})
rev, _ := kna.ServingV1alpha1().Revisions(testNamespace).Get(testRevision, metav1.GetOptions{})
if rev.Spec.ServingState != v1alpha1.RevisionServingStateRetired {
t.Errorf("Unexpected serving state. Want Retired. Got %v.", rev.Spec.ServingState)
}
}

func TestActiveEndpoint_Reserve_WaitsForReady(t *testing.T) {
k8s, ela := fakeClients()
ela.ServingV1alpha1().Revisions(testNamespace).Create(
k8s, kna := fakeClients()
kna.ServingV1alpha1().Revisions(testNamespace).Create(
newRevisionBuilder().
withServingState(v1alpha1.RevisionServingStateReserve).
withReady(false).
build())
k8s.CoreV1().Endpoints(testNamespace).Create(newEndpointBuilder().build())
a := NewRevisionActivator(k8s, ela)
k8s.CoreV1().Services(testNamespace).Create(newServiceBuilder().build())
a := NewRevisionActivator(k8s, kna)

ch := make(chan activationResult)
go func() {
Expand All @@ -133,17 +135,17 @@ func TestActiveEndpoint_Reserve_WaitsForReady(t *testing.T) {
default:
}

rev, _ := ela.ServingV1alpha1().Revisions(testNamespace).Get(testRevision, metav1.GetOptions{})
rev, _ := kna.ServingV1alpha1().Revisions(testNamespace).Get(testRevision, metav1.GetOptions{})
rev.Status.SetCondition(&v1alpha1.RevisionCondition{
Type: v1alpha1.RevisionConditionReady,
Status: corev1.ConditionTrue,
})
ela.ServingV1alpha1().Revisions(testNamespace).Update(rev)
kna.ServingV1alpha1().Revisions(testNamespace).Update(rev)

time.Sleep(100 * time.Millisecond)
time.Sleep(3 * time.Second)
select {
case result := <-ch:
want := Endpoint{"ip", 8080}
want := Endpoint{testServiceFQDN, 8080}
if result.endpoint != want {
t.Errorf("Unexpected endpoint. Want %+v. Got %+v.", want, result.endpoint)
}
Expand All @@ -159,14 +161,14 @@ func TestActiveEndpoint_Reserve_WaitsForReady(t *testing.T) {
}

func TestActiveEndpoint_Reserve_ReadyTimeoutWithError(t *testing.T) {
k8s, ela := fakeClients()
ela.ServingV1alpha1().Revisions(testNamespace).Create(
k8s, kna := fakeClients()
kna.ServingV1alpha1().Revisions(testNamespace).Create(
newRevisionBuilder().
withServingState(v1alpha1.RevisionServingStateReserve).
withReady(false).
build())
k8s.CoreV1().Endpoints(testNamespace).Create(newEndpointBuilder().build())
a := NewRevisionActivator(k8s, ela)
k8s.CoreV1().Services(testNamespace).Create(newServiceBuilder().build())
a := NewRevisionActivator(k8s, kna)
a.(*revisionActivator).readyTimout = 200 * time.Millisecond

ch := make(chan activationResult)
Expand All @@ -182,7 +184,7 @@ func TestActiveEndpoint_Reserve_ReadyTimeoutWithError(t *testing.T) {
default:
}

time.Sleep(200 * time.Millisecond)
time.Sleep(3 * time.Second)
select {
case result := <-ch:
if got, want := result.endpoint, (Endpoint{}); got != want {
Expand All @@ -206,7 +208,7 @@ func fakeClients() (kubernetes.Interface, clientset.Interface) {
Namespace: "",
},
}
return fakeK8s.NewSimpleClientset(nsObj), fakeEla.NewSimpleClientset()
return fakeK8s.NewSimpleClientset(nsObj), fakeKna.NewSimpleClientset()
}

type revisionBuilder struct {
Expand Down Expand Up @@ -262,37 +264,30 @@ func (b *revisionBuilder) withReady(ready bool) *revisionBuilder {
return b
}

type endpointBuilder struct {
endpoint *corev1.Endpoints
type serviceBuilder struct {
service *corev1.Service
}

func newEndpointBuilder() *endpointBuilder {
return &endpointBuilder{
endpoint: &corev1.Endpoints{
func newServiceBuilder() *serviceBuilder {
return &serviceBuilder{

service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-rev-service",
Name: testService,
Namespace: testNamespace,
},
Subsets: []corev1.EndpointSubset{
corev1.EndpointSubset{
Addresses: []corev1.EndpointAddress{
corev1.EndpointAddress{IP: "ip"},
},
Ports: []corev1.EndpointPort{
corev1.EndpointPort{Port: int32(8080)},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "http",
Port: 8080,
},
},
},
},
}
}

func (b *endpointBuilder) build() *corev1.Endpoints {
return b.endpoint
}

func (b *endpointBuilder) withAddressPort(address string, port int32) *endpointBuilder {
b.endpoint.Subsets[0].Addresses[0].IP = address
b.endpoint.Subsets[0].Ports[0].Port = port
return b
func (b *serviceBuilder) build() *corev1.Service {
return b.service
}