Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion pkg/reconciler/labeler/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@ package labeler
import (
"context"

"k8s.io/client-go/tools/cache"

"knative.dev/serving/pkg/apis/serving"
servingclient "knative.dev/serving/pkg/client/injection/client"
configurationinformer "knative.dev/serving/pkg/client/injection/informers/serving/v1/configuration"
revisioninformer "knative.dev/serving/pkg/client/injection/informers/serving/v1/revision"
routeinformer "knative.dev/serving/pkg/client/injection/informers/serving/v1/route"
routereconciler "knative.dev/serving/pkg/client/injection/reconciler/serving/v1/route"
servingreconciler "knative.dev/serving/pkg/reconciler"

"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
servingreconciler "knative.dev/serving/pkg/reconciler"
pkgreconciler "knative.dev/pkg/reconciler"
)

const controllerAgentName = "labeler-controller"
Expand Down Expand Up @@ -56,5 +60,10 @@ func NewController(
logger.Info("Setting up event handlers")
routeInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

configInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: pkgreconciler.LabelExistsFilterFunc(serving.RouteLabelKey),
Handler: controller.HandleAll(impl.EnqueueLabelOfNamespaceScopedResource("", serving.RouteLabelKey)),
})
Comment thread
tanzeeb marked this conversation as resolved.

return impl
}
48 changes: 46 additions & 2 deletions pkg/reconciler/labeler/labeler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,25 @@ func TestReconcile(t *testing.T) {
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", "first-reconcile"),
},
Key: "default/first-reconcile",
}, {
Name: "label pinned revision",
Objects: []runtime.Object{
simpleRoute("default", "pinned-revision", "the-revision"),
simpleConfig("default", "the-config"),
rev("default", "the-config"),
rev("default", "the-config", WithRevName("the-revision")),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchAddFinalizerAction("default", "pinned-revision"),
patchAddLabel("default", "the-revision",
"serving.knative.dev/route", "pinned-revision"),
patchAddLabel("default", "the-config",
"serving.knative.dev/route", "pinned-revision"),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", "pinned-revision"),
},
Key: "default/pinned-revision",
Comment thread
tanzeeb marked this conversation as resolved.
}, {
Name: "steady state",
Objects: []runtime.Object{
Expand Down Expand Up @@ -161,6 +180,23 @@ func TestReconcile(t *testing.T) {
patchAddLabel("default", "new-config", "serving.knative.dev/route", "config-change"),
},
Key: "default/config-change",
}, {
Name: "update configuration",
Objects: []runtime.Object{
simpleRunLatest("default", "config-update", "the-config", WithRouteFinalizer),
simpleConfig("default", "the-config",
WithLatestCreated("the-config-ecoge"),
WithConfigLabel("serving.knative.dev/route", "config-update")),
rev("default", "the-config",
WithRevisionLabel("serving.knative.dev/route", "config-update")),
rev("default", "the-config",
WithRevName("the-config-ecoge")),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchAddLabel("default", "the-config-ecoge",
"serving.knative.dev/route", "config-update"),
},
Key: "default/config-update",
}, {
Name: "delete route",
Objects: []runtime.Object{
Expand Down Expand Up @@ -248,7 +284,15 @@ func routeWithTraffic(namespace, name string, traffic v1.TrafficTarget, opts ...

func simpleRunLatest(namespace, name, config string, opts ...RouteOption) *v1.Route {
return routeWithTraffic(namespace, name, v1.TrafficTarget{
RevisionName: config + "-dbnfd",
RevisionName: config + "-dbnfd",
Percent: ptr.Int64(100),
LatestRevision: ptr.Bool(true),
}, opts...)
}

func simpleRoute(namespace, name, revision string, opts ...RouteOption) *v1.Route {
return routeWithTraffic(namespace, name, v1.TrafficTarget{
RevisionName: revision,
Percent: ptr.Int64(100),
}, opts...)
}
Expand Down Expand Up @@ -276,7 +320,7 @@ func rev(namespace, name string, opts ...RevisionOption) *v1.Revision {
rev := &v1.Revision{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: cfg.Status.LatestCreatedRevisionName,
Name: cfg.Status.LatestReadyRevisionName,
ResourceVersion: "v1",
OwnerReferences: []metav1.OwnerReference{*kmeta.NewControllerRef(cfg)},
},
Expand Down
13 changes: 13 additions & 0 deletions pkg/reconciler/labeler/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,22 @@ func (c *Reconciler) syncLabels(ctx context.Context, r *v1.Route) error {
return err
}
revisions.Insert(tt.RevisionName)

// If the owner reference is a configuration, add it to the list of configurations
owner := metav1.GetControllerOf(rev)
if owner != nil && owner.Kind == "Configuration" {
configs.Insert(owner.Name)

// If we are tracking the latest revision, add the latest created revision as well
// so that there is a smooth transition when the new revision becomes ready.
if tt.LatestRevision != nil && *tt.LatestRevision {
config, err := c.configurationLister.Configurations(r.Namespace).Get(owner.Name)
if err != nil {
return err
}

revisions.Insert(config.Status.LatestCreatedRevisionName)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vagababov @markusthoemmes it is tempting to also label the LCR with the LRR in this case so that the KPA gets a hint about how big to make the LCR before it is receiving traffic 🤔

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(not for this PR)

}
}
}

Expand Down
109 changes: 89 additions & 20 deletions test/e2e/minscale_readiness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ limitations under the License.
package e2e

import (
"fmt"
"strconv"
"testing"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/pkg/test/logstream"
Expand All @@ -42,30 +44,29 @@ func TestMinScale(t *testing.T) {

clients := Setup(t)

name := test.ObjectNameForTest(t)

names := test.ResourceNames{
Config: name,
Route: name,
// Config and Route have different names to avoid false positives
Config: test.ObjectNameForTest(t),
Route: test.ObjectNameForTest(t),
Image: "helloworld",
}

test.CleanupOnInterrupt(func() { test.TearDown(clients, names) })
defer test.TearDown(clients, names)

t.Log("Creating configuration")
if _, err := v1a1test.CreateConfiguration(t, clients, names, withMinScale(minScale)); err != nil {
cfg, err := v1a1test.CreateConfiguration(t, clients, names, withMinScale(minScale))
if err != nil {
t.Fatalf("Failed to create Configuration: %v", err)
}

revName := latestRevisionName(t, clients, names.Config)
deploymentName := revName + "-deployment"
privateServiceName := serverlessServicesName(t, clients, revName)
revName := latestRevisionName(t, clients, names.Config, "")
serviceName := serverlessServicesName(t, clients, revName)

// Before becoming ready, observe minScale
t.Log("Waiting for revision to scale to minScale before becoming ready")
if err := waitForDesiredScale(t, clients, privateServiceName, gte(minScale)); err != nil {
t.Fatalf("The deployment %q did not scale >= %d before becoming ready: %v", deploymentName, minScale, err)
if err := waitForDesiredScale(t, clients, serviceName, gte(minScale)); err != nil {
t.Fatalf("The revision %q did not scale >= %d before becoming ready: %v", revName, minScale, err)
}

// Revision becomes ready
Expand All @@ -78,8 +79,8 @@ func TestMinScale(t *testing.T) {

// Without a route, ignore minScale
t.Log("Waiting for revision to scale below minScale after becoming ready")
if err := waitForDesiredScale(t, clients, privateServiceName, lt(minScale)); err != nil {
t.Fatalf("The deployment %q did not scale < minScale after becoming ready: %v", deploymentName, err)
if err := waitForDesiredScale(t, clients, serviceName, lt(minScale)); err != nil {
t.Fatalf("The revision %q did not scale < minScale after becoming ready: %v", revName, err)
}

// Create route
Expand All @@ -98,8 +99,42 @@ func TestMinScale(t *testing.T) {

// With a route, observe minScale
t.Log("Waiting for revision to scale to minScale after creating route")
if err := waitForDesiredScale(t, clients, privateServiceName, gte(minScale)); err != nil {
t.Fatalf("The deployment %q did not scale >= %d after creating route: %v", deploymentName, minScale, err)
if err := waitForDesiredScale(t, clients, serviceName, gte(minScale)); err != nil {
t.Fatalf("The revision %q did not scale >= %d after creating route: %v", revName, minScale, err)
}

t.Log("Updating configuration")
if _, err := v1a1test.PatchConfig(clients, cfg, withEnv("FOO", "BAR")); err != nil {
t.Fatalf("Failed to update Configuration: %v", err)
}

newRevName := latestRevisionName(t, clients, names.Config, revName)
newServiceName := serverlessServicesName(t, clients, newRevName)

// After update, observe minScale in new revision
t.Log("Waiting for latest revision to scale to minScale after update")
if err := waitForDesiredScale(t, clients, newServiceName, gte(minScale)); err != nil {
t.Fatalf("The revision %q did not scale >= %d after creating route: %v", newRevName, minScale, err)
}

// Revision becomes ready
t.Log("Waiting for revision to become ready")
if err := v1a1test.WaitForRevisionState(
clients.ServingAlphaClient, newRevName, v1a1test.IsRevisionReady, "RevisionIsReady",
); err != nil {
t.Fatalf("The Revision %q did not become ready: %v", newRevName, err)
}

// After update, ensure new revision holds minScale
t.Log("Hold minScale after update")
if err := ensureDesiredScale(t, clients, newServiceName, gte(minScale)); err != nil {
t.Fatalf("The revision %q did not stay at scale >= %d after creating route: %v", newRevName, minScale, err)
}

// After update, ensure old revision ignores minScale
t.Log("Waiting for old revision to scale below minScale after being replaced")
if err := waitForDesiredScale(t, clients, serviceName, lt(minScale)); err != nil {
t.Fatalf("The revision %q did not scale < minScale after being replaced: %v", revName, err)
}
}

Expand All @@ -115,6 +150,12 @@ func lt(m int) func(int) bool {
}
}

func withEnv(name, value string) func(cfg *v1alpha1.Configuration) {
return func(cfg *v1alpha1.Configuration) {
cfg.Spec.GetTemplate().Spec.GetContainer().Env = []v1.EnvVar{{Name: name, Value: value}}
}
}

func withMinScale(minScale int) func(cfg *v1alpha1.Configuration) {
return func(cfg *v1alpha1.Configuration) {
if cfg.Spec.Template.Annotations == nil {
Expand All @@ -124,13 +165,15 @@ func withMinScale(minScale int) func(cfg *v1alpha1.Configuration) {
}
}

func latestRevisionName(t *testing.T, clients *test.Clients, configName string) string {
func latestRevisionName(t *testing.T, clients *test.Clients, configName, oldRevName string) string {
// Wait for the Config have a LatestCreatedRevisionName
if err := v1a1test.WaitForConfigurationState(
clients.ServingAlphaClient, configName,
v1a1test.ConfigurationHasCreatedRevision, "ConfigurationHasCreatedRevision",
func(c *v1alpha1.Configuration) (bool, error) {
return c.Status.LatestCreatedRevisionName != oldRevName, nil
}, "ConfigurationHasUpdatedCreatedRevision",
); err != nil {
t.Fatalf("The Configuration %q does not have a LatestCreatedRevisionName: %v", configName, err)
t.Fatalf("The Configuration %q has not updated LatestCreatedRevisionName from %q: %v", configName, oldRevName, err)
}

config, err := clients.ServingAlphaClient.Configs.Get(configName, metav1.GetOptions{})
Expand All @@ -143,6 +186,7 @@ func latestRevisionName(t *testing.T, clients *test.Clients, configName string)

func serverlessServicesName(t *testing.T, clients *test.Clients, revisionName string) string {
var privateServiceName string

if err := wait.PollImmediate(time.Second, 1*time.Minute, func() (bool, error) {
sks, err := clients.NetworkingClient.ServerlessServices.Get(revisionName, metav1.GetOptions{})
if err != nil {
Expand All @@ -156,17 +200,42 @@ func serverlessServicesName(t *testing.T, clients *test.Clients, revisionName st
}); err != nil {
t.Fatalf("Error retrieving sks %q: %v", revisionName, err)
}

return privateServiceName
}

func waitForDesiredScale(t *testing.T, clients *test.Clients, privateServiceName string, cond func(int) bool) error {
func waitForDesiredScale(t *testing.T, clients *test.Clients, serviceName string, cond func(int) bool) error {
endpoints := clients.KubeClient.Kube.CoreV1().Endpoints(test.ServingNamespace)

return wait.PollImmediate(time.Second, 1*time.Minute, func() (bool, error) {
endpoint, err := endpoints.Get(privateServiceName, metav1.GetOptions{})
return wait.PollImmediate(250*time.Millisecond, 1*time.Minute, func() (bool, error) {
endpoint, err := endpoints.Get(serviceName, metav1.GetOptions{})
if err != nil {
return false, nil
}
return cond(resources.ReadyAddressCount(endpoint)), nil
})

}

func ensureDesiredScale(t *testing.T, clients *test.Clients, serviceName string, cond func(int) bool) error {
endpoints := clients.KubeClient.Kube.CoreV1().Endpoints(test.ServingNamespace)

err := wait.PollImmediate(250*time.Millisecond, 5*time.Second, func() (bool, error) {
endpoint, err := endpoints.Get(serviceName, metav1.GetOptions{})
if err != nil {
return false, nil
}

if scale := resources.ReadyAddressCount(endpoint); !cond(scale) {
return false, fmt.Errorf("scale %d didn't meet condition", scale)
}

return false, nil
})

if err != wait.ErrWaitTimeout {
return err
}

return nil
}
16 changes: 16 additions & 0 deletions test/v1alpha1/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ func CreateConfiguration(t pkgTest.T, clients *test.Clients, names test.Resource
return clients.ServingAlphaClient.Configs.Create(config)
}

// PatchConfig patches the existing config with the provided options. Returns the latest Configuration object
func PatchConfig(clients *test.Clients, cfg *v1alpha1.Configuration, fopt ...v1alpha1testing.ConfigOption) (*v1alpha1.Configuration, error) {
newCfg := cfg.DeepCopy()

for _, opt := range fopt {
opt(newCfg)
}

patchBytes, err := duck.CreateBytePatch(cfg, newCfg)
if err != nil {
return nil, err
}

return clients.ServingAlphaClient.Configs.Patch(cfg.ObjectMeta.Name, types.JSONPatchType, patchBytes, "")
}

// PatchConfigImage patches the existing config passed in with a new imagePath. Returns the latest Configuration object
func PatchConfigImage(clients *test.Clients, cfg *v1alpha1.Configuration, imagePath string) (*v1alpha1.Configuration, error) {
newCfg := cfg.DeepCopy()
Expand Down