activator for 0->1 case#341
Conversation
| kind: Pod | ||
| metadata: | ||
| name: ela-activator | ||
| # TODO: use namespace: ela-system for activator |
There was a problem hiding this comment.
Just curious: why can't we use namespace ela-system right now?
There was a problem hiding this comment.
There are two reasons:
- the ingress object we created has to be in the same namespace as the revision.
- the ingress object which defines the backend service (activator-service in this case) has to be in the same namespace as the backend service.
Cross-namespace Ingress kubernetes/kubernetes#17088
It seems the activator is similar to autoscaler on this. #324
There was a problem hiding this comment.
I thought in Istio route rules you specified both a service name and its namespace... I may be misremembering.
There was a problem hiding this comment.
https://github.com/elafros/elafros/blob/4865b17e86a07bdafa886316b121163da179222d/pkg/controller/route/ela_ingress.go#L62
The ingress namespace needs to be the same as the revision, which is default.
I saw your link for Istio service namespace. I'll try it in the next change. Thanks!
| @@ -0,0 +1,159 @@ | |||
| package main | |||
There was a problem hiding this comment.
split main package into //cmd/activator/
|
|
||
| // A layer on top of http.DefaultTransport, with retries. | ||
| // Forked from https://github.com/fission/fission/blob/746c51901da590cff09317dbe59aa19241211812/router/functionHandler.go#L53 | ||
| type RetryingRoundTripper struct { |
There was a problem hiding this comment.
put this into its own package?
There was a problem hiding this comment.
I put this in its own file.
| istiov1alpha2.DestinationWeight{ | ||
| Destination: istiov1alpha2.IstioService{ | ||
| Name: route.Service, | ||
| Name: "activator-service.default.svc.cluster.local", |
There was a problem hiding this comment.
This probably shouldn't hardcode "default"
| } | ||
| kc, err := kubernetes.NewForConfig(clusterConfig) | ||
| if err != nil { | ||
| panic(err) |
| kind: Pod | ||
| metadata: | ||
| name: ela-activator | ||
| # TODO: use namespace: ela-system for activator |
There was a problem hiding this comment.
I thought in Istio route rules you specified both a service name and its namespace... I may be misremembering.
| return resp, nil | ||
| } | ||
| timeout *= time.Duration(2) | ||
| log.Printf("Retrying request to %v in %v", req.URL.Host, timeout) |
| log.Println("Sending a proxy request to", targetURL) | ||
| target, err := url.Parse(targetURL) | ||
| if err != nil { | ||
| panic(err.Error()) |
There was a problem hiding this comment.
glog.Fatalf, although I'm worried about the implications of that.
| destinationWeights = append(destinationWeights, | ||
| istiov1alpha2.DestinationWeight{ | ||
| Destination: istiov1alpha2.IstioService{ | ||
| Name: route.Service, |
There was a problem hiding this comment.
https://istio.io/docs/reference/config/istio.routing.v1alpha1.html#IstioService
I see a "namespace" here?
There was a problem hiding this comment.
ah, thanks for the pointer, that may help. I'll try it in the next change.
mattmoor
left a comment
There was a problem hiding this comment.
Just to manage my own expectations: this is WIP, and some of the TODOs will be done before this goes in, right?
Is so, let's change the title to [WIP] title so it's clear to reviewers, and remove that when we think it's complete.
Does that SGTY?
|
|
||
| a, err := activator.NewActivator(kubeClient, elaClient) | ||
| if err != nil { | ||
| glog.Fatal(err) |
| srcs = [ | ||
| "activator.go", | ||
| "retryingtripper.go", | ||
| ], |
There was a problem hiding this comment.
nit: buildify your BUILD files.
| } | ||
| // finally, one more retry with the default timeout | ||
| return http.DefaultTransport.RoundTrip(req) | ||
| // NewActivator returns an activator. |
| target, err := url.Parse(targetURL) | ||
| if err != nil { | ||
| panic(err.Error()) | ||
| glog.Fatal(err.Error()) |
There was a problem hiding this comment.
Can we avoid glog.Fatalf and panic in libraries? I'd expect this to propagate the error and for the caller to handle it appropriately.
Especially since Activator is a shared component, I worry about one tenant being able to DoS other tenants by crash looping the activator pods.
| revisionClient := a.elaClient.ElafrosV1alpha1().Revisions("default") | ||
| // TODO: determine the revision to be activated. | ||
| revision, err := revisionClient.Get("p-d22006ac-ef13-45cb-ae6a-069dfe2c81a5", metav1.GetOptions{}) | ||
| revision, err := revisionClient.Get("p-c106f1fa-082e-41ac-a506-a9c1016dec77", metav1.GetOptions{}) |
There was a problem hiding this comment.
um.... LOL. I didn't notice this before.
| // A layer on top of http.DefaultTransport, with retries. | ||
| // Forked from https://github.com/fission/fission/blob/746c51901da590cff09317dbe59aa19241211812/router/functionHandler.go#L53 | ||
| type RetryingRoundTripper struct { | ||
| maxRetries int |
|
|
||
| func (rrt RetryingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { | ||
| timeout := rrt.initialTimeout | ||
| transport := http.DefaultTransport.(*http.Transport) |
There was a problem hiding this comment.
Please construct the above with an inner transport vs. assuming DefaultTransport.
There was a problem hiding this comment.
Do you mean add a field of transport in struct RetryingRoundTripper?
There was a problem hiding this comment.
I think RetryingRoundTripper should have an inner http.RoundTripper
| transport.DialContext = (&net.Dialer{ | ||
| Timeout: timeout, | ||
| KeepAlive: 30 * time.Second, | ||
| }).DialContext |
There was a problem hiding this comment.
Whoa, tampering with the settings of the DefaultTransport feels super sketchy.
I also don't like that we can't operate on an http.RoundTripper and need an *http.Transport.
Is there a reason that the timeout manipulation needs to involve DialContext?
There was a problem hiding this comment.
It seems timeout is a field in transport.DialContext?
| if err == nil { | ||
| return resp, nil | ||
| } | ||
| timeout *= time.Duration(2) |
There was a problem hiding this comment.
I think this is doubling the timeout each iteration, but I find the fact that it's going through time.Duration(2) to make this harder to read than simply writing: timeout = timeout + timeout. WDYT?
| } | ||
| timeout *= time.Duration(2) | ||
| glog.Info("Retrying request to %v in %v", req.URL.Host, timeout) | ||
| time.Sleep(timeout) |
There was a problem hiding this comment.
This is actually how I would have expected the timeout logic to creep up over time. See my question above about DialContext.
| image: ela-activator:latest | ||
| ports: | ||
| - name: http | ||
| containerPort: 8080 No newline at end of file |
There was a problem hiding this comment.
If this single Pod fail or is evicted, which happens regularly (e.g. cluster resizing), then it will not be restarted. Instead of creating a single pod, you should create a Deployment scaled to 1 which ensure the correct number of Pods remains running.
| a, err := activator.NewActivator(kubeClient, elaClient) | ||
| if err != nil { | ||
| glog.Fatal(err) | ||
| glog.Fatalf("Failed to create an activator: %s", err.Error()) |
There was a problem hiding this comment.
Drop the .Error() and switch %s to %v
| glog.Info("Sending a proxy request to ", targetURL) | ||
| target, err := url.Parse(targetURL) | ||
| if err != nil { | ||
| glog.Errorf("Failed to parse target URL: %s. Error: %s", targetURL, err.Error()) |
There was a problem hiding this comment.
Again, you don't need .Error(), use %v
| Destination: istiov1alpha2.IstioService{ | ||
| // TODO: update activator-service namespace to be ela-system. | ||
| Name: "activator-service.default.svc.cluster.local", | ||
| Name: controller.GetElaK8SActivatorServiceName(), |
| // Construct a hostname that the ingress accepts traffic for. | ||
| hostRule := u.Spec.DomainSuffix | ||
|
|
||
| ns := namespace |
| if err != nil { | ||
| glog.Info("Failed to update the revision: ", revision) | ||
| } | ||
| _, _ = a.retry(updateRevisionMaxRetries, 50*time.Millisecond, revision, a.updateRevision) |
| case v1alpha1.RevisionServingStateActive: | ||
| // The revision is already active. Forward the request to k8s deployment. | ||
| serviceURL, err := a.getRevisionTargetURL(revision) | ||
| serviceURL, err := a.retry(getTargetURLMaxRetries, 50*time.Millisecond, revision, a.getRevisionTargetURL) |
There was a problem hiding this comment.
This should be done via Watch.
| glog.Info("End proxy request") | ||
| } | ||
|
|
||
| func (a *Activator) retry(maxRetries int, sleep time.Duration, revision *v1alpha1.Revision, runner func(*v1alpha1.Revision) (string, error)) (string, error) { |
There was a problem hiding this comment.
Can we drop the retry logic from the first cut of this, and add it back once the base design is in place?
It is my expectation that the core of this will make use of Watch and shouldn't need retries. Right now it's using retry to poll IIUC, and that is more expensive and in my experience slower.
There was a problem hiding this comment.
+1. The retries were originally part of making this work with Fission and maybe they aren't needed. Let's test it first.
| revisionClient := a.elaClient.ElafrosV1alpha1().Revisions("default") | ||
| // TODO: determine the revision to be activated. | ||
| revision, err := revisionClient.Get("p-c106f1fa-082e-41ac-a506-a9c1016dec77", metav1.GetOptions{}) | ||
| revision, err := revisionClient.Get("p-dd33338c-7389-4abe-8020-774c2d085d05", metav1.GetOptions{}) |
There was a problem hiding this comment.
What is our plan for obtaining the revision ID?
| default: | ||
| panic("unrecognized revision serving status: " + revision.Spec.ServingState) | ||
| glog.Fatal("unrecognized revision serving status: " + revision.Spec.ServingState) | ||
| } |
There was a problem hiding this comment.
This isn't exactly what I meant, but I'd like to see us drop the retry and get a solid core before they come back.
| image: ela-activator:latest | ||
| ports: | ||
| - name: http | ||
| containerPort: 8080 No newline at end of file |
There was a problem hiding this comment.
If this single Pod fail or is evicted, which happens regularly (e.g. cluster resizing), then it will not be restarted. Instead of creating a single pod, you should create a Deployment scaled to 1 which ensure the correct number of Pods remains running.
| @@ -0,0 +1 @@ | |||
| // TODO No newline at end of file | |||
There was a problem hiding this comment.
You can always provide a link to the //pkg/autoscaler/README.md which explains the role of the activator. Would be better to have all the docs in one place anyway.
| glog.Info("End proxy request") | ||
| } | ||
|
|
||
| func (a *Activator) retry(maxRetries int, sleep time.Duration, revision *v1alpha1.Revision, runner func(*v1alpha1.Revision) (string, error)) (string, error) { |
There was a problem hiding this comment.
+1. The retries were originally part of making this work with Fission and maybe they aren't needed. Let's test it first.
| func (a *Activator) getRevisionTargetURL(revision *v1alpha1.Revision) (string, error) { | ||
| services := a.kubeClient.CoreV1().Services(revision.GetNamespace()) | ||
| // k8s service name | ||
| svc, err := services.Get(revision.GetName()+"-service", metav1.GetOptions{}) |
There was a problem hiding this comment.
You can use GetElaK8SServiceNameForRevision instead of hardcoding "-service": https://github.com/elafros/elafros/blob/41acb4c66c1457b23472d4b65b1ca9eac1f1df40/pkg/controller/names.go#L53
| func (a *Activator) handler(w http.ResponseWriter, r *http.Request) { | ||
| revisionClient := a.elaClient.ElafrosV1alpha1().Revisions("default") | ||
| // TODO: determine the revision to be activated. | ||
| revision, err := revisionClient.Get("p-dd33338c-7389-4abe-8020-774c2d085d05", metav1.GetOptions{}) |
| proxyRequest(w, r, serviceURL) | ||
| case v1alpha1.RevisionServingStateReserve: | ||
| // The revision is inactive. Enqueue the request and activate the revision | ||
| // TODO: check if the route traffic (route.Spec.Traffic) contains the revision |
There was a problem hiding this comment.
Maybe you don't need to do this. The Route Controller (or someone) will make sure the Revision states are updated. You can just rely on the Revision states.
| glog.Info("Found revision ", revision.GetName()) | ||
| glog.Info("Start to proxy request...") | ||
| switch revision.Spec.ServingState { | ||
| case v1alpha1.RevisionServingStateActive: |
There was a problem hiding this comment.
The Revision may be Active, but the Service may not have any endpoints in it. There is a delay between when the Revision.Spec is changed to Active and when the controller creates the resources and they all come online.
So I would expect that requests which arrive immediately after the first one will take this code branch, but get a 5xx error instead of reaching a Pod through the Service.
I suggest you don't worry about that right now. Just get this working. But create a separate issue to close the gap. You can either 1) retry on 5xx requests to the Service or 2) proxy all requests directly to Pods you find through labels (my preference for simplicity).
There was a problem hiding this comment.
I'd recommend Watching Revisions and the Revision's Status should have a Condition reflecting the readiness of the K8s Service.
There was a problem hiding this comment.
Yeah, the Spec here is not what we want to key off and Revision status should indeed be what we use to determine the actual status of k8s service.
| // TODO: check if the route traffic (route.Spec.Traffic) contains the revision | ||
| glog.Info("the revision is inactive. Activating it and enqueuing request") | ||
| revision.Spec.ServingState = v1alpha1.RevisionServingStateActive | ||
| _, _ = a.retry(updateRevisionMaxRetries, 50*time.Millisecond, revision, a.updateRevision) |
There was a problem hiding this comment.
If you get a large burst of traffic you will create a lock train on the Revision by trying to update it to Active for every request. Instead, you deduplicate activation requests.
Again, just take a note and fix this later. Get it working in a happy, single-request scenario first.
| InitialTimeout time.Duration | ||
| } | ||
|
|
||
| func (rrt RetryingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { |
There was a problem hiding this comment.
As mentioned above, this might not be necessary.
| // By default we map to the placeholder service directly. | ||
| // This would point to 'router' component if we wanted to use | ||
| // this method for 0->1 case. | ||
| // This would point to 'activator' component if enableActivator is true. |
There was a problem hiding this comment.
Does enableActivator mean an experiment is on? Or that the revision is in Reserve state and should be routed to the Activator? The name should clarify.
The feature experiment flag is enableActivatorExperiment. By default it's false. When it's true, the traffic will be directed to activator-service. Right now activator uses the hard coded revision "configuration-example-00001". This will be addressed by #552.
| }, []string{"status"}) | ||
| // The experiment flag to turn on activator feature. If set to true, the traffic | ||
| // will be directed to activator-service. | ||
| enableActivatorExperiment = false |
There was a problem hiding this comment.
Nit: maybe you could say "...the traffic will always be directed to the activator..." just to make it clear that this is a development checkpoint. And you should probably link to an issue that tracks the work to route traffic away from the activator when the revision is in the active state.
|
This is a nice checkpoint for development. The activator isn't currently part of the serving path, so checking this in doesn't disrupt anything. I saw it successfully activate a revision so we know it works. :) |
| ) | ||
|
|
||
| func main() { | ||
| glog.Info("Starting the elafros activator...") |
There was a problem hiding this comment.
flag.Parse() before using glog. Otherwise, you will get error messages prefixed into your logs.
| glog.Info("Found revision ", revision.GetName()) | ||
| glog.Info("Start to proxy request...") | ||
| switch revision.Spec.ServingState { | ||
| case v1alpha1.RevisionServingStateActive: |
There was a problem hiding this comment.
Do we plan to add throttling in case of steep spikes in the calls? We probably don't want to queue thousands of requests per route. This will also come in handy in case we have problems scaling up or even creating the revision.
There was a problem hiding this comment.
@mdemirhan, yes! We really need to throttle during normal service as well. I opened #602 to track this work.
| func makeIstioRouteSpec(u *v1alpha1.Route, tt *v1alpha1.TrafficTarget, ns string, routes []RevisionRoute, domain string) istiov1alpha2.RouteRuleSpec { | ||
| destinationWeights := []istiov1alpha2.DestinationWeight{} | ||
| placeHolderK8SServiceName := controller.GetElaK8SServiceName(u) | ||
| // if either current or next is inactive, target them to proxy instead of |
There was a problem hiding this comment.
This comment seems a bit out of date now?
| // The experiment flag to turn on activator feature. If set to true, the traffic | ||
| // will always be directed to the activator. | ||
| enableActivatorExperiment = false | ||
| ) |
There was a problem hiding this comment.
Is the only way to enable this to recompile? Could we make this into a flag so that 'easy elafros' would allow folks to play with this feature.
| if err == nil { | ||
| return resp, nil | ||
| } | ||
| timeout = timeout + timeout |
There was a problem hiding this comment.
Should probably use an exponential backoff here?
| glog.Info("Found revision ", revision.GetName()) | ||
| glog.Info("Start to proxy request...") | ||
| switch revision.Spec.ServingState { | ||
| case v1alpha1.RevisionServingStateActive: |
There was a problem hiding this comment.
Yeah, the Spec here is not what we want to key off and Revision status should indeed be what we use to determine the actual status of k8s service.
Use deployment for activator, instead of a pod Use a flag for activator feature flag enableActivatorExperiment Add appendHeaders blob to istio route rule definition
| ) | ||
|
|
||
| // FakeRetryingRoundTripper implements RetryingRoundTripperInterface | ||
| type FakeRetryingRoundTripper struct{} |
There was a problem hiding this comment.
Let's rip out the RetryingRoundTripper. This was leftover from a previous 0-to-1 scaling strategy that used warm pods. It was necessary to probe for readiness through aggressive retries. But we have a better signal in service readiness and actual readiness checks.
To make sure the requests are forwarded only when the service is ready, we will establish a watch on the service to look for the ready condition. @vaikas-google was talking about this.
| glog.Info("Found revision ", revision.GetName()) | ||
| glog.Info("Start to proxy request...") | ||
| switch revision.Spec.ServingState { | ||
| case v1alpha1.RevisionServingStateActive: |
There was a problem hiding this comment.
@mdemirhan, yes! We really need to throttle during normal service as well. I opened #602 to track this work.
| proxyRequest(w, r, serviceURL, a.retryingTripper) | ||
| case v1alpha1.RevisionServingStateReserve: | ||
| // The revision is inactive. Enqueue the request and activate the revision | ||
| glog.Info("the revision is inactive. Activating it and enqueuing reqfretryinguest") |
There was a problem hiding this comment.
"reqfretryinguest" ???
| glog.Info("The revision is active. Forwarding request to service at ", serviceURL) | ||
| proxyRequest(w, r, serviceURL, a.retryingTripper) | ||
| case v1alpha1.RevisionServingStateReserve: | ||
| // The revision is inactive. Enqueue the request and activate the revision |
There was a problem hiding this comment.
We don't actually enqueue the request. We just proxy it immediately. If you don't want to setup the service watch now, open an issue to track that work and link it in here.
I would actually prefer that you left that for a separate issue so more than one person can work on this.
| }) | ||
|
|
||
| appendHeaders := make(map[string]string) | ||
| appendHeaders["revision"] = "configuration-example-00001" |
There was a problem hiding this comment.
You will need to remove hardcoded revisions before committing this.
I believe the plan is to use Istio's appendHeaders feature to have it explicitly communicate the destination. But that's broken, so for now, just lookup the necessary information to figure out which revision this was intended for (as a temporary hack).
| name: ela-activator | ||
| namespace: ela-system | ||
| spec: | ||
| replicas: 1 |
There was a problem hiding this comment.
This feels like it should be replicated (maybe 2-3?) to avoid being a SPOF and dropping requests when a single replica is evicted.
I don't expect it for this change, but I'd like to better understand what our provisioning and autoscaling strategy for this will be longer term. Replicas in some proportion to cluster size feels right to me, but is a total WAG. Can we open an issue to track this? cc @josephburnett
| endpoint := obj.(*corev1.Endpoints) | ||
| eName := endpoint.Name | ||
| namespace := endpoint.Namespace | ||
| if namespace == "ela-system" { |
There was a problem hiding this comment.
I don't understand why we need this?
| if err != nil { | ||
| return | ||
| } | ||
| wi, err := a.elaClient.ElafrosV1alpha1().Revisions(revision.GetNamespace()).Watch(metav1.ListOptions{ |
There was a problem hiding this comment.
I believe that this will result in us starting a watch per request, which still feels expensive (tho less so than our prior polling!).
I think that we should change the activator to create a single informer that watches all revisions and flushes the buffered requests for each revision as it becomes ready.
That said, I would be fine with checkpointing as-is, and following-up (open an issue to track), since it should be an implementation detail / optimization of this package.
There was a problem hiding this comment.
Right now, it's one watch object per unique revision, since there is a pendingRequests map in process().
// Add a watch for each unique revision
It's natural to group requests per unique revision -- the revisions won't be ready at the same time.
There was a problem hiding this comment.
You can still group requests by Revision and have a single watch over all Revisions.
| doneCh: make(chan bool), | ||
| } | ||
| a.chans.revisionRequestCh <- revRequest | ||
| <-revRequest.doneCh |
There was a problem hiding this comment.
This is a boolean channel, but you aren't checking what comes through?
There was a problem hiding this comment.
As you suggested, changed to chan struct{}
| doneCh: make(chan bool), | ||
| } | ||
| a.chans.revisionRequestCh <- revRequest | ||
| <-revRequest.doneCh |
| case revToWatch := <-a.chans.watchCh: | ||
| go a.watchForReady(revToWatch) | ||
| case revToActivate := <-a.chans.activateCh: | ||
| go a.activate(revToActivate) |
There was a problem hiding this comment.
These are asynchronous, so process() could mutate the RevisionRequest while it is being processed, right?
There was a problem hiding this comment.
Do you think I should use sync.Map?
https://golang.org/pkg/sync/#Map
There was a problem hiding this comment.
The map isn't what's concurrently accessed, the element is.
| activationDoneCh chan (string) | ||
| revisionRequestCh chan (RevisionRequest) | ||
| watchCh chan (string) | ||
| } |
There was a problem hiding this comment.
I did a pass over this, and tried to grok enough of the flow to meaningfully review, but the volume of chan usage makes this particularly difficult to follow. I also have concerns around the thread safety and the potential for leaking go routines.
Consider looking in the golang sync package for abstractions that could be useful. cc @grantr may also have thoughts.
There was a problem hiding this comment.
@josephburnett I'll defer to your judgement on whether we want this to go in as-is, but I'm concerned about the complexity of this. I can't confidently say that this has no races, deadlocks, leaks, nor that I'd be able to debug it if it fails, which concerns me for such a critical component. That said, I can appreciate a desire to checkpoint a functional feature, where the complexity is hidden inside of a reasonably defined library.
Even if this goes in as-is, we should seriously consider ways to refactor the internals here to better avoid channels (not entirely, but we can't need 6+), perhaps by better encapsulating parts of this logic in outlined datatypes or methods.
There was a problem hiding this comment.
Ack. I would like this to go in as-is so that me and Nicole can start iterating on it. It is too complex and not production ready. But it's behind an experiment and it works enough to demo.
We will simplify this over the next two weeks as we fine tune the code and load test it.
| return arr[0], arr[1], nil | ||
| } | ||
|
|
||
| func (a *Activator) getRevisionTargetURL(revision *v1alpha1.Revision) (string, error) { |
There was a problem hiding this comment.
FWIW, I might have had an easier time following the use of channels if a lot of these helper methods were split into another file in the package.
| if err != nil { | ||
| glog.Errorf("Failed to parse target URL: %s. Error: %v", serviceURL, err) | ||
| http.Error(revRequest.w, "Failed to forward request.", http.StatusBadRequest) | ||
| return |
There was a problem hiding this comment.
It feels like these paths should also be signaling that they are done?
|
|
||
| appendHeaders := make(map[string]string) | ||
| if len(u.Status.Traffic) > 0 { | ||
| appendHeaders[controller.GetRevisionHeaderName()] = u.Status.Traffic[0].RevisionName |
There was a problem hiding this comment.
Hmm... What if len(u.Status.Traffic) == 2?
There was a problem hiding this comment.
I added a TODO for this. We will handle this later #693.
| glog.Info("Started Activator") | ||
| go a.process() | ||
| quitCh := make(chan struct{}) | ||
| go a.process(quitCh) |
There was a problem hiding this comment.
Why can't we pass stopCh here?
There was a problem hiding this comment.
I was worried that the second usage of stopCh would introduce an error. If the process() popped the element out, Run doesn't know we have received the stop signal. Please let me know if that's not the case.
| appendHeaders := make(map[string]string) | ||
| if len(u.Status.Traffic) > 0 { | ||
| // TODO: Deal with the case when the route has more than one traffic targets. | ||
| // https://github.com/elafros/elafros/issues/693 |
There was a problem hiding this comment.
It feels worth noting inline that this depends on an Istio feature.
| if len(svc.Spec.Ports) != 1 { | ||
| return "", fmt.Errorf("need just one port. Found %v ports", len(svc.Spec.Ports)) | ||
| ip := endpoint.Subsets[0].Addresses[0].IP | ||
| port := endpoint.Subsets[0].Ports[0].Port |
There was a problem hiding this comment.
This change wholly eliminates any load balancing we'd get if the backing service scaled up, which concerns me. This too should have an issue tracking it.
| proxy := httputil.NewSingleHostReverseProxy(target) | ||
| func (a *Activator) proxyRequest(revRequest RevisionRequest, serviceURL *url.URL) { | ||
| glog.Infof("Sending a proxy request to %q", serviceURL) | ||
| proxy := httputil.NewSingleHostReverseProxy(serviceURL) |
There was a problem hiding this comment.
Your refactoring has highlighted that this is being called inside of a loop. I have no sense for how complex this constructor is, which worries me a little bit.
| func (a *Activator) process() { | ||
| pendingRequests := make(map[string][]RevisionRequest) | ||
| func (a *Activator) process(quitCh chan struct{}) { | ||
| // TODO: https://golang.org/pkg/sync/#Map |
There was a problem hiding this comment.
IDK what this TODO means, since you're using it below?
| pendingRequests := make(map[string][]RevisionRequest) | ||
| func (a *Activator) process(quitCh chan struct{}) { | ||
| // TODO: https://golang.org/pkg/sync/#Map | ||
| var pendingRequests sync.Map //map[string][]RevisionRequest |
There was a problem hiding this comment.
This comment is to show what type of map it is, for readability.
| case revToWatch := <-a.chans.watchCh: | ||
| go a.watchForReady(revToWatch) | ||
| case revToActivate := <-a.chans.activateCh: | ||
| go a.activate(revToActivate) |
There was a problem hiding this comment.
The map isn't what's concurrently accessed, the element is.
| if err != nil { | ||
| return | ||
| } | ||
| wi, err := a.elaClient.ElafrosV1alpha1().Revisions(revision.GetNamespace()).Watch(metav1.ListOptions{ |
There was a problem hiding this comment.
You can still group requests by Revision and have a single watch over all Revisions.
| activationDoneCh chan (string) | ||
| revisionRequestCh chan (RevisionRequest) | ||
| watchCh chan (string) | ||
| } |
There was a problem hiding this comment.
@josephburnett I'll defer to your judgement on whether we want this to go in as-is, but I'm concerned about the complexity of this. I can't confidently say that this has no races, deadlocks, leaks, nor that I'd be able to debug it if it fails, which concerns me for such a critical component. That said, I can appreciate a desire to checkpoint a functional feature, where the complexity is hidden inside of a reasonably defined library.
Even if this goes in as-is, we should seriously consider ways to refactor the internals here to better avoid channels (not entirely, but we can't need 6+), perhaps by better encapsulating parts of this logic in outlined datatypes or methods.
| const ( | ||
| requestQueueLength = 100 | ||
| happyPath = true | ||
| sadPath = false |
|
@akyyy: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
|
/retest |
|
/approve |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: akyyy, mattmoor The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
* Add activator for 0->1 case. The feature experiment flag is enableActivatorExperiment. By default it's false. When it's true, the traffic will be directed to activator-service. Right now activator uses the hard coded revision "configuration-example-00001". This will be addressed by knative#552. * update one comment * Add unit test Use deployment for activator, instead of a pod Use a flag for activator feature flag enableActivatorExperiment Add appendHeaders blob to istio route rule definition * remove retryingtripper.go * Add a couple of todo links * watch works * Use a queue and watch object for each unique revision * Use channels * activator with channels * fieldSelector * update names * Send watch ready event to watch channel * delete a binary * merge * add unit tests * change back * remove debug code * Update test since we allow configuration change based on revision change in this pr * address cr comments * add error handling channel * Use sync.Map for pendingRequests * Talk to endpoint directly * fix tests with existing endpoints * disable activator by default * delete extra comment * Change scaleToZeroThreshold back to 5 min * Update comments
* Add github action to sync upstream * Fix comment
#14