diff --git a/BUILD.bazel b/BUILD.bazel index 5a2db926a9ed..744f57823e32 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -32,6 +32,14 @@ k8s_object( template = "webhook.yaml", ) +k8s_object( + name = "activator", + images = { + "ela-activator:latest": "//cmd/ela-activator:image", + }, + template = "activator.yaml", +) + k8s_object( name = "namespace", template = "namespace.yaml", @@ -57,6 +65,11 @@ k8s_object( template = "elawebhookservice.yaml", ) +k8s_object( + name = "ela-activator-service", + template = "ela-activator-service.yaml", +) + k8s_object( name = "configuration", template = "configuration.yaml", @@ -109,6 +122,8 @@ k8s_objects( ":controllerservice", ":webhook", ":elawebhookservice", + ":activator", + ":ela-activator-service", ], ) diff --git a/activator.yaml b/activator.yaml new file mode 100644 index 000000000000..207b8afa04d8 --- /dev/null +++ b/activator.yaml @@ -0,0 +1,39 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1beta1 +kind: Deployment +metadata: + name: ela-activator + namespace: ela-system +spec: + # TODO: Increase the replicas + # https://github.com/elafros/elafros/issues/695 + replicas: 1 + template: + metadata: + labels: + app: ela-activator + role: ela-activator + spec: + serviceAccountName: ela-controller + containers: + - name: ela-activator + image: ela-activator:latest + ports: + - name: http + containerPort: 8080 + args: + - "-logtostderr=true" + - "-stderrthreshold=INFO" diff --git a/cmd/ela-activator/BUILD.bazel b/cmd/ela-activator/BUILD.bazel new file mode 100644 index 000000000000..d9375c9abb6a --- /dev/null +++ b/cmd/ela-activator/BUILD.bazel @@ -0,0 +1,31 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "go_default_library", + srcs = ["main.go"], + importpath = "github.com/elafros/elafros/cmd/ela-activator", + visibility = ["//visibility:private"], + deps = [ + "//pkg/activator:go_default_library", + "//pkg/client/clientset/versioned:go_default_library", + "//pkg/signals:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", + ], +) + +go_binary( + name = "ela-activator", + embed = [":go_default_library"], + pure = "on", + visibility = ["//visibility:public"], +) + +load("@io_bazel_rules_docker//go:image.bzl", "go_image") + +go_image( + name = "image", + binary = ":ela-activator", + visibility = ["//visibility:public"], +) diff --git a/cmd/ela-activator/main.go b/cmd/ela-activator/main.go new file mode 100644 index 000000000000..170c9373e161 --- /dev/null +++ b/cmd/ela-activator/main.go @@ -0,0 +1,54 @@ +/* +Copyright 2018 Google Inc. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "net/http" + + "github.com/elafros/elafros/pkg/activator" + clientset "github.com/elafros/elafros/pkg/client/clientset/versioned" + "github.com/elafros/elafros/pkg/signals" + "github.com/golang/glog" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +func main() { + flag.Parse() + glog.Info("Starting the elafros activator...") + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + clusterConfig, err := rest.InClusterConfig() + if err != nil { + glog.Fatal(err) + } + kubeClient, err := kubernetes.NewForConfig(clusterConfig) + if err != nil { + glog.Fatal(err) + } + elaClient, err := clientset.NewForConfig(clusterConfig) + if err != nil { + glog.Fatalf("Error building ela clientset: %v", err) + } + a, err := activator.NewActivator(kubeClient, elaClient, http.DefaultTransport.(*http.Transport)) + if err != nil { + glog.Fatalf("Failed to create an activator: %v", err) + } + + a.Run(stopCh) + glog.Flush() +} diff --git a/controller.yaml b/controller.yaml index f061f68c6cda..767bf04c244c 100644 --- a/controller.yaml +++ b/controller.yaml @@ -37,6 +37,8 @@ spec: - ela-queue:latest - "-autoscalerImage" - ela-autoscaler:latest + # The flag to turn on the activator. + - "-enableActivatorExperiment=false" ports: - name: metrics containerPort: 9090 diff --git a/ela-activator-service.yaml b/ela-activator-service.yaml new file mode 100644 index 000000000000..b16398e5cbbe --- /dev/null +++ b/ela-activator-service.yaml @@ -0,0 +1,30 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Service +metadata: + name: activator-service + namespace: ela-system + labels: + app: ela-activator +spec: + selector: + app: ela-activator + ports: + - name: http + protocol: TCP + port: 80 + targetPort: 8080 + type: NodePort diff --git a/pkg/activator/BUILD.bazel b/pkg/activator/BUILD.bazel index e69de29bb2d1..019d0c36a6db 100644 --- a/pkg/activator/BUILD.bazel +++ b/pkg/activator/BUILD.bazel @@ -0,0 +1,34 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "activator.go", + ], + importpath = "github.com/elafros/elafros/pkg/activator", + visibility = ["//visibility:public"], + deps = [ + "//pkg/apis/ela/v1alpha1:go_default_library", + "//pkg/client/clientset/versioned:go_default_library", + "//pkg/controller:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["activator_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/apis/ela:go_default_library", + "//pkg/client/clientset/versioned/fake:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + ], +) diff --git a/pkg/activator/README.md b/pkg/activator/README.md new file mode 100644 index 000000000000..09cc49554251 --- /dev/null +++ b/pkg/activator/README.md @@ -0,0 +1 @@ +See the [section](https://github.com/elafros/elafros/blob/master/pkg/autoscaler/README.md#activator) in the Autoscaler documentation. diff --git a/pkg/activator/activator.go b/pkg/activator/activator.go new file mode 100644 index 000000000000..866688041f79 --- /dev/null +++ b/pkg/activator/activator.go @@ -0,0 +1,342 @@ +/* +Copyright 2018 Google Inc. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package activator + +import ( + "fmt" + "net/http" + "net/http/httputil" + "net/url" + "strings" + "sync" + + "github.com/elafros/elafros/pkg/apis/ela/v1alpha1" + clientset "github.com/elafros/elafros/pkg/client/clientset/versioned" + "github.com/elafros/elafros/pkg/controller" + "github.com/golang/glog" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +// Activator that can activate revisions in reserve state or redirect traffic to active revisions. +type Activator struct { + kubeClient kubernetes.Interface + elaClient clientset.Interface + // A RoundTripper member allows us to pass fake tripper in test to mimic a revision. + tripper http.RoundTripper + chans Channels +} + +// Channels hold all channels for activating revisions. +type Channels struct { + activateCh chan (string) + activationDoneCh chan (string) + badRevisionCh chan (string) + revisionRequestCh chan (RevisionRequest) + watchCh chan (string) +} + +// RevisionRequest holds the http request information. +type RevisionRequest struct { + name string + namespace string + w http.ResponseWriter + r *http.Request + active bool + doneCh chan struct{} +} + +const ( + requestQueueLength = 100 +) + +// NewActivator returns an Activator. +func NewActivator(kubeClient kubernetes.Interface, elaClient clientset.Interface, tripper http.RoundTripper) (*Activator, error) { + return &Activator{ + kubeClient: kubeClient, + elaClient: elaClient, + tripper: tripper, + chans: Channels{ + activateCh: make(chan string, requestQueueLength), + activationDoneCh: make(chan string, requestQueueLength), + badRevisionCh: make(chan string, requestQueueLength), + revisionRequestCh: make(chan RevisionRequest, requestQueueLength), + watchCh: make(chan string, requestQueueLength), + }, + }, nil +} + +func getRevisionKey(namespace string, name string) string { + return namespace + "/" + name +} + +func getRevisionNameFromKey(key string) (namespace string, name string, err error) { + arr := strings.Split(key, "/") + if len(arr) != 2 { + glog.Errorf("Invalid revision key ", key) + return "", "", fmt.Errorf("Invalid revision key %s", key) + } + return arr[0], arr[1], nil +} + +func (a *Activator) getRevisionTargetURL(revision *v1alpha1.Revision) (*url.URL, error) { + endpoint, err := a.kubeClient.CoreV1().Endpoints(revision.GetNamespace()).Get( + controller.GetElaK8SServiceNameForRevision(revision), metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } + return nil, err + } + if len(endpoint.Subsets[0].Ports) != 1 { + return nil, fmt.Errorf("need just one port. Found %v ports", len(endpoint.Subsets[0].Ports)) + } + // TODO: figure out why do we need to use the pod IP directly to avoid the delay. + // We should be able to use the k8s service cluster IP. + // https://github.com/elafros/elafros/issues/660 + ip := endpoint.Subsets[0].Addresses[0].IP + port := endpoint.Subsets[0].Ports[0].Port + u := &url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%d", ip, port), + } + return u, nil +} + +func (a *Activator) proxyRequest(revRequest RevisionRequest, serviceURL *url.URL) { + glog.Infof("Sending a proxy request to %q", serviceURL) + proxy := httputil.NewSingleHostReverseProxy(serviceURL) + proxy.Transport = a.tripper + proxy.ServeHTTP(revRequest.w, revRequest.r) + // Make sure the handler function exits after ServeHTTP function. + revRequest.doneCh <- struct{}{} + glog.Info("End proxy request") +} + +func (a *Activator) proxyRequests(revKey string, requests []RevisionRequest) { + glog.Infof("Sending %d requests to revision %s.", len(requests), revKey) + if len(requests) == 0 { + return + } + + revision, err := a.getRevision(requests[0].namespace, requests[0].name) + if err != nil { + glog.Errorf("Failed to get revision %s, %q", revKey, err) + a.stopRequests(revKey, requests) + return + } + serviceURL, err := a.getRevisionTargetURL(revision) + if err != nil { + glog.Errorf("Failed to get service URL for revision %s, %q", revKey, err) + a.stopRequests(revKey, requests) + return + } + // TODO: Consider sending the requests in parallel. + for i := range requests { + a.proxyRequest(requests[i], serviceURL) + } +} + +func (a *Activator) stopRequests(revKey string, requests []RevisionRequest) { + glog.Infof("Write to response for %d bad requests for revision %s.", len(requests), revKey) + for _, revRequest := range requests { + http.Error(revRequest.w, "Bad request.", http.StatusInternalServerError) + revRequest.doneCh <- struct{}{} + } +} + +func (a *Activator) updateRevision(revision *v1alpha1.Revision) error { + revisionClient := a.elaClient.ElafrosV1alpha1().Revisions(revision.Namespace) + _, err := revisionClient.Update(revision) + if err != nil { + glog.Errorf("Failed to update the revision: %s/%s", revision.GetNamespace(), revision.GetName()) + return err + } + return nil +} + +func (a *Activator) getRevisionFromKey(revKey string) (*v1alpha1.Revision, error) { + ns, name, err := getRevisionNameFromKey(revKey) + if err != nil { + return nil, err + } + return a.getRevision(ns, name) +} + +func (a *Activator) getRevision(ns string, name string) (*v1alpha1.Revision, error) { + revisionClient := a.elaClient.ElafrosV1alpha1().Revisions(ns) + revision, err := revisionClient.Get(name, metav1.GetOptions{}) + if err != nil { + glog.Errorf("Unable to get revision %s/%s", ns, name) + return nil, err + } + return revision, nil +} + +func (a *Activator) activate(revKey string) { + glog.Info("Revision to be activated: ", revKey) + revision, err := a.getRevisionFromKey(revKey) + if err != nil { + glog.Errorf("Failed to get revision from the key.") + a.chans.badRevisionCh <- revKey + return + } + revision.Spec.ServingState = v1alpha1.RevisionServingStateActive + if err := a.updateRevision(revision); err != nil { + glog.Errorf("Failed to update revision.") + a.chans.badRevisionCh <- revKey + return + } + glog.Infof("Updated the revision: %s", revision.GetName()) +} + +func (a *Activator) watchForReady(revKey string) { + glog.Infof("Watching for revision %s to be ready", revKey) + revision, err := a.getRevisionFromKey(revKey) + if err != nil { + glog.Errorf("Failed to get revision from the key.") + a.chans.badRevisionCh <- revKey + return + } + wi, err := a.elaClient.ElafrosV1alpha1().Revisions(revision.GetNamespace()).Watch(metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", revision.GetName()), + }) + if err != nil { + glog.Errorf("Failed to watch the revision %s.", revKey) + a.chans.badRevisionCh <- revKey + return + } + defer wi.Stop() + ch := wi.ResultChan() + for { + event := <-ch + if rev, ok := event.Object.(*v1alpha1.Revision); ok { + if !rev.Status.IsReady() { + continue + } + a.chans.activationDoneCh <- revKey + glog.Infof("Revision %s is ready.", revKey) + return + } + } +} + +// The main method to process requests. Only active or reserved revisions reach here. +func (a *Activator) process(quitCh chan struct{}) { + var pendingRequests sync.Map //map[string][]RevisionRequest + for { + select { + case revReq := <-a.chans.revisionRequestCh: + revKey := getRevisionKey(revReq.namespace, revReq.name) + revRequests, found := pendingRequests.Load(revKey) + if !found { + revRequests = []RevisionRequest{} + pendingRequests.Store(revKey, revRequests) + // Only put the first reserved revision to the activateCh. + if !revReq.active { + glog.Infof("Add %s to activate channel", revKey) + a.chans.activateCh <- revKey + } + // Add a watch for each unique revision + glog.Infof("Add %s to watch channel", revKey) + a.chans.watchCh <- revKey + } + pendingRequests.Store(revKey, append(revRequests.([]RevisionRequest), revReq)) + case revToWatch := <-a.chans.watchCh: + go a.watchForReady(revToWatch) + case revToActivate := <-a.chans.activateCh: + go a.activate(revToActivate) + case revDone := <-a.chans.activationDoneCh: + revRequests, found := pendingRequests.Load(revDone) + if found { + pendingRequests.Delete(revDone) + go a.proxyRequests(revDone, revRequests.([]RevisionRequest)) + } else { + glog.Errorf("The revision %s is unexpected in activator", revDone) + } + case badRev := <-a.chans.badRevisionCh: + revRequests, found := pendingRequests.Load(badRev) + if found { + pendingRequests.Delete(badRev) + go a.stopRequests(badRev, revRequests.([]RevisionRequest)) + } else { + glog.Errorf("The revision %s is unexpected in activator", badRev) + } + case <-quitCh: + pendingRequests.Range(func(revKey, revRequests interface{}) bool { + a.chans.badRevisionCh <- revKey.(string) + return true + }) + } + } +} + +func (a *Activator) handler(w http.ResponseWriter, r *http.Request) { + // TODO: Use the namespace from the header. + // https://github.com/elafros/elafros/issues/693 + revisionClient := a.elaClient.ElafrosV1alpha1().Revisions("default") + revisionName := r.Header.Get(controller.GetRevisionHeaderName()) + revision, err := revisionClient.Get(revisionName, metav1.GetOptions{}) + if err != nil { + http.Error(w, "Unable to get revision.", http.StatusNotFound) + return + } + glog.Infof("Found revision %s in namespace %s", revision.GetName(), revision.GetNamespace()) + switch revision.Spec.ServingState { + case v1alpha1.RevisionServingStateActive: + glog.Infof("The revision %s/%s is active.", revision.GetNamespace(), revision.GetName()) + revRequest := RevisionRequest{ + name: revision.GetName(), + namespace: revision.GetNamespace(), + r: r, + w: w, + active: true, + doneCh: make(chan struct{}), + } + a.chans.revisionRequestCh <- revRequest + <-revRequest.doneCh + case v1alpha1.RevisionServingStateReserve: + glog.Infof("The revision %s/%s is inactive.", revision.GetNamespace(), revision.GetName()) + revRequest := RevisionRequest{ + name: revision.GetName(), + namespace: revision.GetNamespace(), + r: r, + w: w, + active: false, + doneCh: make(chan struct{}), + } + a.chans.revisionRequestCh <- revRequest + <-revRequest.doneCh + case v1alpha1.RevisionServingStateRetired: + glog.Info("revision is retired. do nothing.") + http.Error(w, "Retired revision.", http.StatusServiceUnavailable) + default: + glog.Errorf("unrecognized revision serving status: %s", revision.Spec.ServingState) + http.Error(w, "Unknown revision status.", http.StatusServiceUnavailable) + } +} + +// Run will set up the event handler for requests. +func (a *Activator) Run(stopCh <-chan struct{}) error { + glog.Info("Started Activator") + quitCh := make(chan struct{}) + go a.process(quitCh) + http.HandleFunc("/", a.handler) + http.ListenAndServe(":8080", nil) + <-stopCh + quitCh <- struct{}{} + glog.Info("Shutting down Activator") + return nil +} diff --git a/pkg/activator/activator_test.go b/pkg/activator/activator_test.go new file mode 100644 index 000000000000..b1a24ae3c785 --- /dev/null +++ b/pkg/activator/activator_test.go @@ -0,0 +1,325 @@ +/* +Copyright 2018 Google Inc. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package activator + +import ( + "bytes" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/elafros/elafros/pkg/apis/ela" + "github.com/elafros/elafros/pkg/apis/ela/v1alpha1" + fakeclientset "github.com/elafros/elafros/pkg/client/clientset/versioned/fake" + "github.com/elafros/elafros/pkg/controller" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + fakekubeclientset "k8s.io/client-go/kubernetes/fake" +) + +const ( + testNamespace string = "default" + testRevisionName string = "test-rev" + testPort = 1234 + createRevision = true + doNotCreateRevision = false +) + +// FakeRoundTripper serves as a fake transport +type FakeRoundTripper struct{} + +// RoundTrip returns a response with status 200. +func (rrt FakeRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + body := "everything works fine." + return &http.Response{ + Status: "200 OK", + StatusCode: 200, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Body: ioutil.NopCloser(bytes.NewBufferString(body)), + ContentLength: int64(len(body)), + Request: req, + Header: make(http.Header, 0), + }, nil +} + +func getHTTPRequest(t *testing.T, revisionName string) *http.Request { + req, err := http.NewRequest("GET", "/api/projects", nil) + if err != nil { + t.Error("Failed to create http request.") + } + req.Header.Set("Elafros-Revision", revisionName) + return req +} + +func getTestRevision(servingState v1alpha1.RevisionServingStateType, revName string) *v1alpha1.Revision { + return &v1alpha1.Revision{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/apis/ela/v1alpha1/namespaces/test/revisions/test-rev", + Name: revName, + Namespace: testNamespace, + Labels: map[string]string{ + "testLabel1": "foo", + "testLabel2": "bar", + ela.RouteLabelKey: "test-route", + }, + }, + Spec: v1alpha1.RevisionSpec{ + Container: corev1.Container{ + Image: "gcr.io/repo/image", + Command: []string{"echo"}, + Args: []string{"hello", "world"}, + WorkingDir: "/tmp", + Env: []corev1.EnvVar{{ + Name: "EDITOR", + Value: "emacs", + }}, + LivenessProbe: &corev1.Probe{ + TimeoutSeconds: 42, + }, + ReadinessProbe: &corev1.Probe{ + TimeoutSeconds: 43, + }, + TerminationMessagePath: "/dev/null", + }, + ServingState: servingState, + }, + } +} + +// Add the test revisions to the environment. +func createRevisions(t *testing.T, kubeClient *fakekubeclientset.Clientset, elaClient *fakeclientset.Clientset, revisions ...*v1alpha1.Revision) { + for _, rev := range revisions { + if rev != nil { + // Add the revision + elaClient.ElafrosV1alpha1().Revisions(rev.GetNamespace()).Create(rev) + // Add the k8s service + kubeClient.CoreV1().Services(rev.GetNamespace()).Create( + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: controller.GetElaK8SServiceNameForRevision(rev), + Namespace: rev.GetNamespace(), + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "abc", + Ports: []corev1.ServicePort{ + { + Name: "test-port", + Port: int32(testPort), + TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: testPort}, + }, + }, + Type: "NodePort", + }, + }, + ) + // Add the k8s endpoint + kubeClient.CoreV1().Endpoints(rev.GetNamespace()).Create( + &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: controller.GetElaK8SServiceNameForRevision(rev), + Namespace: rev.GetNamespace(), + }, + Subsets: []corev1.EndpointSubset{ + corev1.EndpointSubset{ + Addresses: []corev1.EndpointAddress{ + corev1.EndpointAddress{ + IP: "abc", + }, + }, + Ports: []corev1.EndpointPort{ + corev1.EndpointPort{ + Port: 1234, + }, + }, + }, + }, + }, + ) + } + } +} + +func getActivator(t *testing.T, kubeClient *fakekubeclientset.Clientset, elaClient *fakeclientset.Clientset) *Activator { + tripper := FakeRoundTripper{} + a, err := NewActivator(kubeClient, elaClient, tripper) + if err != nil { + t.Fatal("Failed to create an activator!") + } + return a +} + +func TestGetRevisionTargetURL(t *testing.T) { + reservedRev := getTestRevision(v1alpha1.RevisionServingStateReserve, testRevisionName) + elaClient := fakeclientset.NewSimpleClientset() + kubeClient := fakekubeclientset.NewSimpleClientset() + createRevisions(t, kubeClient, elaClient, reservedRev) + a := getActivator(t, kubeClient, elaClient) + targetURL, err := a.getRevisionTargetURL(reservedRev) + if err != nil { + t.Errorf("Error in getRevisionTargetURL %v", err) + } + expectedURL := "http://abc:1234" + if targetURL.String() != expectedURL { + t.Errorf("getRevisionTargetURL returned unexpected url %s, expected %s", targetURL, expectedURL) + } +} + +func setRevisionReady(elaClient *fakeclientset.Clientset, rev *v1alpha1.Revision) { + // Wait a bit to kick off the ready event. + time.Sleep(50 * time.Millisecond) + rev.Status = v1alpha1.RevisionStatus{ + Conditions: []v1alpha1.RevisionCondition{ + { + Type: v1alpha1.RevisionConditionReady, + Status: corev1.ConditionTrue, + }, + }, + } + elaClient.ElafrosV1alpha1().Revisions(rev.GetNamespace()).Update(rev) +} + +func handleRequst(handler http.Handler, rr *httptest.ResponseRecorder, req *http.Request, signal chan struct{}) { + handler.ServeHTTP(rr, req) + signal <- struct{}{} +} + +func testHandlerRevision(t *testing.T, servingState v1alpha1.RevisionServingStateType, expectedStatus int, addRevToEnv bool) { + rev := getTestRevision(servingState, testRevisionName) + kubeClient := fakekubeclientset.NewSimpleClientset() + elaClient := fakeclientset.NewSimpleClientset() + if addRevToEnv { + createRevisions(t, kubeClient, elaClient, rev) + } + signal := make(chan struct{}) + a := getActivator(t, kubeClient, elaClient) + quitCh := make(chan struct{}) + go a.process(quitCh) + req := getHTTPRequest(t, testRevisionName) + // response recorder to record the response + responseRecorder := httptest.NewRecorder() + // Our handlers satisfy http.Handler, so we can call their ServeHTTP method + // directly and pass in our Request and ResponseRecorder. + handler := http.HandlerFunc(a.handler) + go handleRequst(handler, responseRecorder, req, signal) + setRevisionReady(elaClient, rev) + <-signal + if status := responseRecorder.Code; status != expectedStatus { + t.Errorf("handler returned wrong status code: got %v want %v", + status, expectedStatus) + } + quitCh <- struct{}{} +} + +// Test for a revision that's not in Elafros. +func TestHandler_revisionNotInElafros(t *testing.T) { + testHandlerRevision(t, v1alpha1.RevisionServingStateActive, http.StatusNotFound, doNotCreateRevision) +} + +// Test for a revision with reserve status. +func TestHandler_reserveRevision(t *testing.T) { + testHandlerRevision(t, v1alpha1.RevisionServingStateReserve, http.StatusOK, createRevision) +} + +// Test for a revision with active status. +func TestHandler_activeRevision(t *testing.T) { + testHandlerRevision(t, v1alpha1.RevisionServingStateActive, http.StatusOK, createRevision) +} + +// Test for a revision with reretired status. +func TestHandler_retiredRevision(t *testing.T) { + testHandlerRevision(t, v1alpha1.RevisionServingStateRetired, http.StatusServiceUnavailable, createRevision) +} + +// Test for a revision with unknown status. +func TestHandler_unknowRevision(t *testing.T) { + testHandlerRevision(t, "Unknown", http.StatusServiceUnavailable, createRevision) +} + +func testHandlerMultipleRevisions(t *testing.T, revMap map[*v1alpha1.Revision]int) { + kubeClient := fakekubeclientset.NewSimpleClientset() + elaClient := fakeclientset.NewSimpleClientset() + count := 0 + for rev, num := range revMap { + createRevisions(t, kubeClient, elaClient, rev) + count += num + } + if count == 0 { + return + } + + signal := make(chan struct{}, count) + responseRecorders := make([]*httptest.ResponseRecorder, count) + a := getActivator(t, kubeClient, elaClient) + quitCh := make(chan struct{}) + go a.process(quitCh) + handler := http.HandlerFunc(a.handler) + + index := 0 + for rev, num := range revMap { + for i := 0; i < num; i++ { + req := getHTTPRequest(t, rev.GetName()) + responseRecorders[index] = httptest.NewRecorder() + go handleRequst(handler, responseRecorders[index], req, signal) + index++ + } + } + + // Set the revisions to be ready + for rev := range revMap { + setRevisionReady(elaClient, rev) + } + + // wait until all requests are done + for i := 0; i < count; i++ { + <-signal + } + + for _, rr := range responseRecorders { + if status := rr.Code; status != http.StatusOK { + t.Errorf("handler returned wrong status code: got %v want %v", + status, http.StatusOK) + } + } + quitCh <- struct{}{} +} + +// Test when there are 3 concurrent requests for a particular reserve revision. +func TestHandler_threeConcurrentReserveRevisions(t *testing.T) { + rev := getTestRevision(v1alpha1.RevisionServingStateReserve, testRevisionName) + revMap := make(map[*v1alpha1.Revision]int) + revMap[rev] = 3 + testHandlerMultipleRevisions(t, revMap) +} + +// Test when there are 2 concurrent requests for a particular active revision. +func TestHandler_twoConcurrentActiveRevisions(t *testing.T) { + rev := getTestRevision(v1alpha1.RevisionServingStateActive, testRevisionName) + revMap := make(map[*v1alpha1.Revision]int) + revMap[rev] = 2 + testHandlerMultipleRevisions(t, revMap) +} + +// Test when there are two unique revisions, pendingRequests has more than one entries. +func TestHandler_twoUniqueRevisions(t *testing.T) { + rev1 := getTestRevision(v1alpha1.RevisionServingStateActive, "test-rev1") + rev2 := getTestRevision(v1alpha1.RevisionServingStateReserve, "test-rev2") + revMap := make(map[*v1alpha1.Revision]int) + revMap[rev1] = 3 + revMap[rev2] = 2 + testHandlerMultipleRevisions(t, revMap) +} diff --git a/pkg/apis/istio/v1alpha2/routerule_types.go b/pkg/apis/istio/v1alpha2/routerule_types.go index b8f9c47c095e..d40f2fc09fa9 100644 --- a/pkg/apis/istio/v1alpha2/routerule_types.go +++ b/pkg/apis/istio/v1alpha2/routerule_types.go @@ -32,7 +32,7 @@ type RouteRule struct { } // Istio route looks like so, but couldn't find a k8s/go definition for it -// so we'll just create one. This is terrrible, but it just might work for +// so we'll just create one. This is terrible, but it just might work for // now, but if things change on their end, this will most certainly break :( // spec: // destination: @@ -50,6 +50,9 @@ type RouteRule struct { // - destination: // name: revision-service-2 // weight: 10 +// # https://github.com/istio/istio/blob/master/tests/helm/templates/rule-default-route-append-headers.yaml +// appendHeaders: +// istio-custom-header: user-defined-value type DestinationWeight struct { Destination IstioService `json:"destination"` Weight int `json:"weight"` @@ -78,9 +81,10 @@ type MatchString struct { } type RouteRuleSpec struct { - Destination IstioService `json:"destination"` - Match Match `json:"match,omitempty"` - Route []DestinationWeight `json:"route"` + Destination IstioService `json:"destination"` + Match Match `json:"match,omitempty"` + Route []DestinationWeight `json:"route"` + AppendHeaders map[string]string `json:"appendHeaders"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/istio/v1alpha2/zz_generated.deepcopy.go b/pkg/apis/istio/v1alpha2/zz_generated.deepcopy.go index 39c582ad25cd..ca2779b6f139 100644 --- a/pkg/apis/istio/v1alpha2/zz_generated.deepcopy.go +++ b/pkg/apis/istio/v1alpha2/zz_generated.deepcopy.go @@ -196,6 +196,13 @@ func (in *RouteRuleSpec) DeepCopyInto(out *RouteRuleSpec) { *out = make([]DestinationWeight, len(*in)) copy(*out, *in) } + if in.AppendHeaders != nil { + in, out := &in.AppendHeaders, &out.AppendHeaders + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } return } diff --git a/pkg/controller/configuration/configuration.go b/pkg/controller/configuration/configuration.go index 4a5f0b58dd87..c2c7a7525c7b 100644 --- a/pkg/controller/configuration/configuration.go +++ b/pkg/controller/configuration/configuration.go @@ -396,13 +396,6 @@ func (c *Controller) addRevisionEvent(obj interface{}) { return } - if config.Status.IsLatestReadyRevisionNameUpToDate() { - // The configuration is already ready and has LatestReadyRevisionName equal - // to LatestCreatedRevisionName, so ignore this event. - glog.Infof("Configuration %q is already ready with latest created revision ready", revisionName) - return - } - // Don't modify the informer's copy. config = config.DeepCopy() diff --git a/pkg/controller/configuration/configuration_test.go b/pkg/controller/configuration/configuration_test.go index 4241854924a9..f6b01cc9ae2c 100644 --- a/pkg/controller/configuration/configuration_test.go +++ b/pkg/controller/configuration/configuration_test.go @@ -44,7 +44,6 @@ import ( ctrl "github.com/elafros/elafros/pkg/controller" "k8s.io/client-go/rest" - kubetesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" kubeinformers "k8s.io/client-go/informers" @@ -474,14 +473,6 @@ func TestDoNotUpdateConfigurationWhenLatestReadyRevisionNameIsUpToDate(t *testin }}, } - // In this case, we can't tell if addRevisionEvent has updated the - // Configuration, because it's already in the expected state. Use a reactor - // instead to test whether Update() is called. - elaClient.Fake.PrependReactor("update", "configurations", func(a kubetesting.Action) (bool, runtime.Object, error) { - t.Error("Configuration was updated unexpectedly") - return true, nil, nil - }) - controller.addRevisionEvent(revision) } diff --git a/pkg/controller/names.go b/pkg/controller/names.go index ed5ba6de36d1..557c05b55d5c 100644 --- a/pkg/controller/names.go +++ b/pkg/controller/names.go @@ -65,8 +65,16 @@ func GetElaK8SServiceName(u *v1alpha1.Route) string { return u.Name + "-service" } -func GetElaK8SRouterServiceName(u *v1alpha1.Route) string { - return "router-service" +func GetElaK8SActivatorServiceName() string { + return "activator-service" +} + +func GetElaK8SActivatorNamespace() string { + return "ela-system" +} + +func GetRevisionHeaderName() string { + return "Elafros-Revision" } func GetOrCreateRevisionNamespace(ns string, c clientset.Interface) (string, error) { diff --git a/pkg/controller/revision/revision.go b/pkg/controller/revision/revision.go index 861236d668c5..b656c2176fc6 100644 --- a/pkg/controller/revision/revision.go +++ b/pkg/controller/revision/revision.go @@ -505,6 +505,14 @@ func getIsServiceReady(e *corev1.Endpoints) bool { return false } +func getRevisionLastTransitionTime(r *v1alpha1.Revision) time.Time { + condCount := len(r.Status.Conditions) + if condCount == 0 { + return r.CreationTimestamp.Time + } + return r.Status.Conditions[condCount-1].LastTransitionTime.Time +} + func (c *Controller) addBuildEvent(obj interface{}) { build := obj.(*buildv1alpha1.Build) @@ -575,7 +583,7 @@ func (c *Controller) addEndpointsEvent(obj interface{}) { return } - revisionAge := time.Now().Sub(rev.CreationTimestamp.Time) + revisionAge := time.Now().Sub(getRevisionLastTransitionTime(rev)) if revisionAge < serviceTimeoutDuration { return } diff --git a/pkg/controller/route/ela_ingress.go b/pkg/controller/route/ela_ingress.go index 183265eedc92..690090d8dd47 100644 --- a/pkg/controller/route/ela_ingress.go +++ b/pkg/controller/route/ela_ingress.go @@ -40,10 +40,13 @@ func MakeRouteIngress(route *v1alpha1.Route) *v1beta1.Ingress { fmt.Sprintf("*.%s", route.Status.Domain), } - // By default we map to the placeholder service directly. - // This would point to 'router' component if we wanted to use - // this method for 0->1 case. + // This would point to 'activator' component if enableActivatorExperiment is true. + namespace := route.Namespace serviceName := controller.GetElaK8SServiceName(route) + if enableActivatorExperiment { + namespace = controller.GetElaK8SActivatorNamespace() + serviceName = controller.GetElaK8SActivatorServiceName() + } path := v1beta1.HTTPIngressPath{ Backend: v1beta1.IngressBackend{ @@ -68,7 +71,7 @@ func MakeRouteIngress(route *v1alpha1.Route) *v1beta1.Ingress { return &v1beta1.Ingress{ ObjectMeta: metav1.ObjectMeta{ Name: controller.GetElaK8SIngressName(route), - Namespace: route.Namespace, + Namespace: namespace, Annotations: map[string]string{ "kubernetes.io/ingress.class": "istio", }, diff --git a/pkg/controller/route/ela_istio_route.go b/pkg/controller/route/ela_istio_route.go index d4419b802077..686740dfed14 100644 --- a/pkg/controller/route/ela_istio_route.go +++ b/pkg/controller/route/ela_istio_route.go @@ -21,37 +21,66 @@ import ( "regexp" "github.com/elafros/elafros/pkg/apis/ela/v1alpha1" - "github.com/elafros/elafros/pkg/controller" - istiov1alpha2 "github.com/elafros/elafros/pkg/apis/istio/v1alpha2" - + "github.com/elafros/elafros/pkg/controller" + "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // makeIstioRouteSpec creates an Istio route func makeIstioRouteSpec(u *v1alpha1.Route, tt *v1alpha1.TrafficTarget, ns string, routes []RevisionRoute, domain string) istiov1alpha2.RouteRuleSpec { - // if either current or next is inactive, target them to proxy instead of - // the backend so the 0->1 transition will happen. + destinationWeights := []istiov1alpha2.DestinationWeight{} placeHolderK8SServiceName := controller.GetElaK8SServiceName(u) - destinationWeights := calculateDestinationWeights(u, tt, routes) - if tt != nil { - domain = fmt.Sprintf("%s.%s", tt.Name, domain) + // TODO: Set up routerules in different namespace, direct traffic to activator only when necessary. + // https://github.com/elafros/elafros/issues/607 + if !enableActivatorExperiment { + destinationWeights = calculateDestinationWeights(u, tt, routes) + if tt != nil { + domain = fmt.Sprintf("%s.%s", tt.Name, domain) + } + + return istiov1alpha2.RouteRuleSpec{ + Destination: istiov1alpha2.IstioService{ + Name: placeHolderK8SServiceName, + }, + Match: istiov1alpha2.Match{ + Request: istiov1alpha2.MatchRequest{ + Headers: istiov1alpha2.Headers{ + Authority: istiov1alpha2.MatchString{ + Regex: regexp.QuoteMeta(domain), + }, + }, + }, + }, + Route: destinationWeights, + } } + // if enableActivatorExperiment flag is on, and there are reserved revisions, + // define the corresponding istio route rules. + glog.Info("using activator-service as the destination") + placeHolderK8SServiceName = controller.GetElaK8SActivatorServiceName() + destinationWeights = append(destinationWeights, + istiov1alpha2.DestinationWeight{ + Destination: istiov1alpha2.IstioService{ + Name: placeHolderK8SServiceName, + }, + Weight: 100, + }) + + appendHeaders := make(map[string]string) + if len(u.Status.Traffic) > 0 { + // TODO: Deal with the case when the route has more than one traffic targets. + // Note this has dependency on istio features. + // https://github.com/elafros/elafros/issues/693 + appendHeaders[controller.GetRevisionHeaderName()] = u.Status.Traffic[0].RevisionName + } return istiov1alpha2.RouteRuleSpec{ Destination: istiov1alpha2.IstioService{ Name: placeHolderK8SServiceName, }, - Match: istiov1alpha2.Match{ - Request: istiov1alpha2.MatchRequest{ - Headers: istiov1alpha2.Headers{ - Authority: istiov1alpha2.MatchString{ - Regex: regexp.QuoteMeta(domain), - }, - }, - }, - }, - Route: destinationWeights, + Route: destinationWeights, + AppendHeaders: appendHeaders, } } diff --git a/pkg/controller/route/route.go b/pkg/controller/route/route.go index 176b9d93b79c..bc430db7dd4b 100644 --- a/pkg/controller/route/route.go +++ b/pkg/controller/route/route.go @@ -19,6 +19,7 @@ package route import ( "context" "errors" + "flag" "fmt" "reflect" "time" @@ -56,6 +57,9 @@ var ( "Counter to keep track of items in the route work queue.", stats.UnitNone) statusTagKey tag.Key + // The experiment flag in controller.yaml to turn on activator feature. The default is false. + // If it's true, the traffic will always be directed to the activator. + enableActivatorExperiment bool ) const ( @@ -105,6 +109,10 @@ type Controller struct { controllerConfig controller.Config } +func init() { + flag.BoolVar(&enableActivatorExperiment, "enableActivatorExperiment", false, "The experiment flag to turn on activator feature.") +} + // NewController initializes the controller and is called by the generated code // Registers eventhandlers to enqueue events // config - client configuration for talking to the apiserver @@ -334,7 +342,7 @@ func (c *Controller) updateRouteEvent(key string) error { glog.Infof("Running reconcile Route for %s\n%+v\n", route.Name, route) // Create a placeholder service that is simply used by istio as a placeholder. - // This service could eventually be the 'router' service that will get all the + // This service could eventually be the 'activator' service that will get all the // fallthrough traffic if there are no route rules (revisions to target). // This is one way to implement the 0->1. For now, we'll just create a placeholder // that selects nothing. @@ -433,14 +441,18 @@ func (c *Controller) reconcilePlaceholderService(route *v1alpha1.Route) error { } func (c *Controller) reconcileIngress(route *v1alpha1.Route) error { + ingressNamespace := route.Namespace + if enableActivatorExperiment { + ingressNamespace = controller.GetElaK8SActivatorNamespace() + } ingress := MakeRouteIngress(route) - ingressClient := c.kubeclientset.Extensions().Ingresses(route.Namespace) + ingressClient := c.kubeclientset.Extensions().Ingresses(ingressNamespace) existing, err := ingressClient.Get(controller.GetElaK8SIngressName(route), metav1.GetOptions{}) if err != nil { if apierrs.IsNotFound(err) { if _, err = ingressClient.Create(ingress); err == nil { - glog.Infof("Created ingress %q", ingress.Name) - c.recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Ingress %q", ingress.Name) + glog.Infof("Created ingress %q in namespace %q", ingress.Name, ingressNamespace) + c.recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Ingress %q in namespace %q", ingress.Name, ingressNamespace) } } return err @@ -449,8 +461,8 @@ func (c *Controller) reconcileIngress(route *v1alpha1.Route) error { if !reflect.DeepEqual(existing.Spec, ingress.Spec) { existing.Spec = ingress.Spec if _, err = ingressClient.Update(existing); err == nil { - glog.Infof("Updated ingress %q", ingress.Name) - c.recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Ingress %q", ingress.Name) + glog.Infof("Updated ingress %q in namespace %q", ingress.Name, ingressNamespace) + c.recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Ingress %q in namespace %q", ingress.Name, ingressNamespace) } return err } @@ -641,9 +653,6 @@ func (c *Controller) createOrUpdateRouteRules(route *v1alpha1.Route, configMap m glog.Errorf("No routes were found for the service %q", route.Name) return nil, nil } - for _, rr := range revisionRoutes { - glog.Infof("Adding a route to %q Weight: %d", rr.Service, rr.Weight) - } // Create route rule for the route domain routeRuleName := controller.GetRouteRuleName(route, nil)