diff --git a/config/core/300-resources/podautoscaler.yaml b/config/core/300-resources/podautoscaler.yaml
index 5ee934e4fac2..14e2f10c1d0f 100644
--- a/config/core/300-resources/podautoscaler.yaml
+++ b/config/core/300-resources/podautoscaler.yaml
@@ -181,6 +181,9 @@ spec:
description: DesiredScale shows the current desired number of replicas for the revision.
type: integer
format: int32
+ metricsPaused:
+ description: MetricsPaused to determine whether metric scraping should be paused
+ type: boolean
metricsServiceName:
description: |-
MetricsServiceName is the K8s Service name that provides revision metrics.
diff --git a/docs/serving-api.md b/docs/serving-api.md
index 77aad5ea6a63..634a89975f78 100644
--- a/docs/serving-api.md
+++ b/docs/serving-api.md
@@ -497,6 +497,17 @@ int32
ActualScale shows the actual number of replicas for the revision.
+
+
+metricsPaused
+
+bool
+
+ |
+
+ MetricsPaused to determine whether metric scraping should be paused
+ |
+
PodScalable
diff --git a/pkg/apis/autoscaling/v1alpha1/pa_types.go b/pkg/apis/autoscaling/v1alpha1/pa_types.go
index 0cf64b692a9e..926a00b414e0 100644
--- a/pkg/apis/autoscaling/v1alpha1/pa_types.go
+++ b/pkg/apis/autoscaling/v1alpha1/pa_types.go
@@ -127,6 +127,9 @@ type PodAutoscalerStatus struct {
// ActualScale shows the actual number of replicas for the revision.
ActualScale *int32 `json:"actualScale,omitempty"`
+
+ // MetricsPaused to determine whether metric scraping should be paused
+ MetricsPaused bool `json:"metricsPaused,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
diff --git a/pkg/autoscaler/metrics/collector.go b/pkg/autoscaler/metrics/collector.go
index 9b7e865eb878..73b0ff51e349 100644
--- a/pkg/autoscaler/metrics/collector.go
+++ b/pkg/autoscaler/metrics/collector.go
@@ -71,6 +71,10 @@ type Collector interface {
// Watch registers a singleton function to call when a specific collector's status changes.
// The passed name is the namespace/name of the metric owned by the respective collector.
Watch(func(types.NamespacedName))
+ // Pause metric collection
+ Pause(metric *autoscalingv1alpha1.Metric)
+ // Resume metric collection
+ Resume(metric *autoscalingv1alpha1.Metric)
}
// MetricClient surfaces the metrics that can be obtained via the collector.
@@ -166,6 +170,28 @@ func (c *MetricCollector) Record(key types.NamespacedName, now time.Time, stat S
}
}
+func (c *MetricCollector) Pause(metric *autoscalingv1alpha1.Metric) {
+ key := types.NamespacedName{Namespace: metric.Namespace, Name: metric.Name}
+
+ c.collectionsMutex.RLock()
+ defer c.collectionsMutex.RUnlock()
+
+ if collection, exists := c.collections[key]; exists {
+ collection.setPause(true)
+ }
+}
+
+func (c *MetricCollector) Resume(metric *autoscalingv1alpha1.Metric) {
+ key := types.NamespacedName{Namespace: metric.Namespace, Name: metric.Name}
+
+ c.collectionsMutex.RLock()
+ defer c.collectionsMutex.RUnlock()
+
+ if collection, exists := c.collections[key]; exists {
+ collection.setPause(false)
+ }
+}
+
// Watch registers a singleton function to call when collector status changes.
func (c *MetricCollector) Watch(fn func(types.NamespacedName)) {
c.watcherMutex.Lock()
@@ -251,6 +277,7 @@ type (
scraper StatsScraper
lastErr error
grp sync.WaitGroup
+ paused bool
stopCh chan struct{}
}
)
@@ -297,6 +324,7 @@ func newCollection(metric *autoscalingv1alpha1.Metric, scraper StatsScraper, clo
scraper: scraper,
stopCh: make(chan struct{}),
+ paused: false,
}
key := types.NamespacedName{Namespace: metric.Namespace, Name: metric.Name}
@@ -322,6 +350,13 @@ func newCollection(metric *autoscalingv1alpha1.Metric, scraper StatsScraper, clo
continue
}
+ // do not scrape if paused
+ if c.getPaused() {
+ // send an empty stat to allow scale to zero due to activator behavior and would get zeroed out anyway
+ c.record(clock.Now(), emptyStat)
+ continue
+ }
+
stat, err := scraper.Scrape(c.currentMetric().Spec.StableWindow)
if err != nil {
logger.Errorw("Failed to scrape metrics", zap.Error(err))
@@ -345,6 +380,24 @@ func (c *collection) close() {
c.grp.Wait()
}
+// pause the scraper, happens when activator in path
+func (c *collection) setPause(pause bool) {
+ c.mux.Lock()
+ defer c.mux.Unlock()
+
+ if c.paused != pause {
+ c.paused = pause
+ }
+}
+
+// function to get if scraper is paused or not
+func (c *collection) getPaused() bool {
+ c.mux.RLock()
+ defer c.mux.RUnlock()
+
+ return c.paused
+}
+
// updateMetric safely updates the metric stored in the collection.
func (c *collection) updateMetric(metric *autoscalingv1alpha1.Metric) {
c.mux.Lock()
diff --git a/pkg/autoscaler/metrics/collector_test.go b/pkg/autoscaler/metrics/collector_test.go
index 5ff61dd7eccc..041c581afe3e 100644
--- a/pkg/autoscaler/metrics/collector_test.go
+++ b/pkg/autoscaler/metrics/collector_test.go
@@ -640,3 +640,122 @@ func TestMetricCollectorAggregate(t *testing.T) {
t.Errorf("Stable Concurrency = %f, want: %f", got, want)
}
}
+
+func TestMetricCollectorPausing(t *testing.T) {
+ logger := TestLogger(t)
+
+ mtp := &fake.ManualTickProvider{
+ Channel: make(chan time.Time),
+ }
+ now := time.Now()
+ fc := fake.Clock{
+ FakeClock: clocktest.NewFakeClock(now),
+ TP: mtp,
+ }
+ metricKey := types.NamespacedName{Namespace: defaultNamespace, Name: defaultName}
+ const (
+ reportConcurrency = 10
+ reportRPS = 20
+
+ wantPausedConcurrency = 0
+ wantPausedRPS = 0
+ wantPausedPConcurrency = 0
+ wantPausedPRPS = 0
+
+ wantResumeRPS = 10
+ wantResumeConcurrency = 5
+ wantResumePConcurrency = 5
+ wantResumePRPS = 10
+ )
+ stat := Stat{
+ PodName: "testPod",
+ AverageConcurrentRequests: reportConcurrency,
+ RequestCount: reportRPS,
+ }
+ scraper := &testScraper{
+ s: func() (Stat, error) {
+ return stat, nil
+ },
+ }
+ factory := scraperFactory(scraper, nil)
+
+ coll := NewMetricCollector(factory, logger)
+ coll.clock = fc
+ coll.CreateOrUpdate(&defaultMetric)
+
+ // pause the collection, then check that metrics are not collected
+ coll.Pause(&defaultMetric)
+ // tick
+ for range 2 {
+ mtp.Channel <- now
+ }
+ // increment by two since 0 is included [0-1]
+ now = now.Add(time.Second)
+ fc.SetTime(now)
+ var gotRPS, gotConcurrency, panicRPS, panicConcurrency float64
+ // Poll to see that the async loop completed.
+ wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
+ gotConcurrency, panicConcurrency, _ = coll.StableAndPanicConcurrency(metricKey, now)
+ gotRPS, panicRPS, _ = coll.StableAndPanicRPS(metricKey, now)
+ return gotConcurrency == wantPausedConcurrency &&
+ panicConcurrency == wantPausedPConcurrency &&
+ gotRPS == wantPausedRPS &&
+ panicRPS == wantPausedPRPS, nil
+ })
+
+ if _, _, err := coll.StableAndPanicConcurrency(metricKey, now); err != nil {
+ t.Error("Paused: StableAndPanicConcurrency() =", err)
+ }
+ if _, _, err := coll.StableAndPanicRPS(metricKey, now); err != nil {
+ t.Error("Paused: StableAndPanicRPS() =", err)
+ }
+ if panicConcurrency != wantPausedPConcurrency {
+ t.Errorf("Paused: PanicConcurrency() = %v, want %v", panicConcurrency, wantPausedPConcurrency)
+ }
+ if panicRPS != wantPausedPRPS {
+ t.Errorf("Paused: PanicRPS() = %v, want %v", panicRPS, wantPausedPRPS)
+ }
+ if gotConcurrency != wantPausedConcurrency {
+ t.Errorf("Paused: StableConcurrency() = %v, want %v", gotConcurrency, wantPausedConcurrency)
+ }
+ if gotRPS != wantPausedRPS {
+ t.Errorf("Paused: StableRPS() = %v, want %v", gotRPS, wantPausedRPS)
+ }
+
+ coll.Resume(&defaultMetric)
+ // increment by two to cover [2-3] (4 seconds total)
+ now = now.Add(2 * time.Second)
+ fc.SetTime(now)
+ for range 2 {
+ mtp.Channel <- now
+ }
+
+ // Poll to see that the async loop completed.
+ wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
+ gotConcurrency, panicConcurrency, _ = coll.StableAndPanicConcurrency(metricKey, now)
+ gotRPS, panicRPS, _ = coll.StableAndPanicRPS(metricKey, now)
+ return gotConcurrency == wantResumeConcurrency &&
+ panicConcurrency == wantResumePConcurrency &&
+ gotRPS == wantResumeRPS &&
+ panicRPS == wantResumePRPS, nil
+ })
+
+ if _, _, err := coll.StableAndPanicConcurrency(metricKey, now); err != nil {
+ t.Error("Resumed: StableAndPanicConcurrency() =", err)
+ }
+ if _, _, err := coll.StableAndPanicRPS(metricKey, now); err != nil {
+ t.Error("Resumed: StableAndPanicRPS() =", err)
+ }
+ if panicConcurrency != wantResumePConcurrency {
+ t.Errorf("Resumed: PanicConcurrency() = %v, want %v", panicConcurrency, wantResumePConcurrency)
+ }
+ if panicRPS != wantResumePRPS {
+ t.Errorf("Resumed: PanicRPS() = %v, want %v", panicRPS, wantResumePRPS)
+ }
+ if gotConcurrency != wantResumeConcurrency {
+ t.Errorf("Resumed: StableConcurrency() = %v, want %v", gotConcurrency, wantResumeConcurrency)
+ }
+ if gotRPS != wantResumeRPS {
+ t.Errorf("Resumed: StableRPS() = %v, want %v", gotRPS, wantResumeRPS)
+ }
+}
diff --git a/pkg/reconciler/autoscaling/kpa/kpa.go b/pkg/reconciler/autoscaling/kpa/kpa.go
index 1914c9512d57..50ed61b508e9 100644
--- a/pkg/reconciler/autoscaling/kpa/kpa.go
+++ b/pkg/reconciler/autoscaling/kpa/kpa.go
@@ -101,9 +101,11 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pa *autoscalingv1alpha1.
// Before we can reconcile decider and get real number of activators
// we start with default of 2.
if _, err = c.ReconcileSKS(ctx, pa, nv1alpha1.SKSOperationModeProxy, minActivators); err != nil {
+ pa.Status.MetricsPaused = false
return fmt.Errorf("error reconciling SKS: %w", err)
}
pa.Status.MarkSKSNotReady(noPrivateServiceName) // In both cases this is true.
+ pa.Status.MetricsPaused = false // unpause metrics
c.computeStatus(ctx, pa, podCounts{want: scaleUnknown}, logger)
return nil
}
@@ -111,10 +113,13 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pa *autoscalingv1alpha1.
pa.Status.MetricsServiceName = sks.Status.PrivateServiceName
decider, err := c.reconcileDecider(ctx, pa)
if err != nil {
+ // unpause incase of this error
+ pa.Status.MetricsPaused = false
return fmt.Errorf("error reconciling Decider: %w", err)
}
if err := c.ReconcileMetric(ctx, pa, resolveScrapeTarget(ctx, pa)); err != nil {
+ pa.Status.MetricsPaused = false
return fmt.Errorf("error reconciling Metric: %w", err)
}
@@ -122,6 +127,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pa *autoscalingv1alpha1.
// the scaleTargetRef based on it.
want, err := c.scaler.scale(ctx, pa, sks, decider.Status.DesiredScale)
if err != nil {
+ pa.Status.MetricsPaused = false
return fmt.Errorf("error scaling target: %w", err)
}
@@ -145,6 +151,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pa *autoscalingv1alpha1.
podCounter := resourceutil.NewPodAccessor(c.podsLister, pa.Namespace, pa.Labels[serving.RevisionLabelKey])
ready, notReady, pending, terminating, err := podCounter.PodCountsByState()
if err != nil {
+ pa.Status.MetricsPaused = false
return fmt.Errorf("error getting pod counts: %w", err)
}
@@ -157,6 +164,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pa *autoscalingv1alpha1.
sks, err = c.ReconcileSKS(ctx, pa, mode, numActivators)
if err != nil {
+ pa.Status.MetricsPaused = false
return fmt.Errorf("error reconciling SKS: %w", err)
}
// Propagate service name.
@@ -171,6 +179,27 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pa *autoscalingv1alpha1.
pa.Status.MarkSKSNotReady(sks.Status.GetCondition(nv1alpha1.ServerlessServiceConditionReady).GetMessage())
}
+ var shouldPause bool
+ isSKSReady := pa.Status.GetCondition(autoscalingv1alpha1.PodAutoscalerConditionSKSReady).IsTrue()
+ // only try probe if sks should be in proxy mode and SKS is ready, otherwise always unpause
+ if sks.Spec.Mode == nv1alpha1.SKSOperationModeProxy && isSKSReady {
+ r, err := c.scaler.activatorProbe(pa, c.scaler.transport)
+ if err != nil {
+ shouldPause = false
+ } else {
+ if r {
+ logger.Debug("activator probed, pausing metrics")
+ }
+ shouldPause = r
+ }
+ } else {
+ shouldPause = false
+ }
+
+ if shouldPause != pa.Status.MetricsPaused {
+ pa.Status.MetricsPaused = shouldPause
+ }
+
logger.Infof("PA scale got=%d, want=%d, desiredPods=%d ebc=%d", ready, want,
decider.Status.DesiredScale, decider.Status.ExcessBurstCapacity)
diff --git a/pkg/reconciler/autoscaling/kpa/kpa_test.go b/pkg/reconciler/autoscaling/kpa/kpa_test.go
index 107d232ff0c8..6dd03a32380c 100644
--- a/pkg/reconciler/autoscaling/kpa/kpa_test.go
+++ b/pkg/reconciler/autoscaling/kpa/kpa_test.go
@@ -176,6 +176,12 @@ func withScales(g, w int32) PodAutoscalerOption {
}
}
+func withPaused(paused bool) PodAutoscalerOption {
+ return func(pa *autoscalingv1alpha1.PodAutoscaler) {
+ pa.Status.MetricsPaused = paused
+ }
+}
+
func metricWithDiffSvc(ns, n string) *autoscalingv1alpha1.Metric {
m := metric(ns, n)
m.Spec.ScrapeTarget = "something-else"
@@ -288,12 +294,13 @@ func TestReconcile(t *testing.T) {
}
return kpa
}
- activeKPAMinScale := func(g, w int32) *autoscalingv1alpha1.PodAutoscaler {
+ activeKPAMinScale := func(g, w int32, paused bool) *autoscalingv1alpha1.PodAutoscaler {
return kpa(
testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized,
withScales(g, w), WithReachabilityReachable,
withMinScale(defaultScale), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc),
WithObservedGeneration(1),
+ withPaused(paused),
)
}
@@ -340,7 +347,7 @@ func TestReconcile(t *testing.T) {
Key: key,
Objects: []runtime.Object{
kpa(testNamespace, testRevision, WithPAMetricsService(privateSvc), WithPAStatusService(testRevision),
- withScales(0, defaultScale)),
+ withScales(0, defaultScale), withPaused(true)),
defaultSKS,
metric(testNamespace, testRevision),
defaultDeployment, defaultReady,
@@ -470,7 +477,7 @@ func TestReconcile(t *testing.T) {
Key: key,
Objects: []runtime.Object{
kpa(testNamespace, testRevision, WithScaleTargetInitialized, WithNoTraffic(noTrafficReason, "The target is not receiving traffic."),
- withScales(0, defaultScale), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc)),
+ withScales(0, defaultScale), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), withPaused(true)),
// SKS is ready here, since its endpoints are populated with Activator endpoints.
sks(testNamespace, testRevision, WithProxyMode, WithDeployRef(deployName), WithSKSReady),
metric(testNamespace, testRevision),
@@ -716,7 +723,7 @@ func TestReconcile(t *testing.T) {
kpa(testNamespace, testRevision, WithScaleTargetInitialized, withScales(0, 0),
WithNoTraffic(noTrafficReason, "The target is not receiving traffic."),
WithPASKSReady, markOld, WithPAStatusService(testRevision),
- WithPAMetricsService(privateSvc), WithObservedGeneration(1)),
+ WithPAMetricsService(privateSvc), WithObservedGeneration(1), withPaused(true)),
sks(testNamespace, testRevision, WithDeployRef(deployName), WithProxyMode, WithSKSReady),
metric(testNamespace, testRevision),
deploy(testNamespace, testRevision, func(d *appsv1.Deployment) {
@@ -732,7 +739,7 @@ func TestReconcile(t *testing.T) {
kpa(testNamespace, testRevision, WithScaleTargetInitialized, withScales(0, 0),
WithNoTraffic(noTrafficReason, "The target is not receiving traffic."),
WithPASKSReady, markOld, WithPAStatusService(testRevision),
- WithPAMetricsService(privateSvc), WithObservedGeneration(1)),
+ WithPAMetricsService(privateSvc), WithObservedGeneration(1), withPaused(true)),
sks(testNamespace, testRevision, WithDeployRef(deployName), WithProxyMode, WithSKSReady),
metric(testNamespace, testRevision),
deploy(testNamespace, testRevision),
@@ -761,7 +768,7 @@ func TestReconcile(t *testing.T) {
WithPASKSReady, WithPAMetricsService(privateSvc),
WithNoTraffic(noTrafficReason, "The target is not receiving traffic."),
WithPAStatusService(testRevision), WithPAMetricsService(privateSvc),
- WithObservedGeneration(1)),
+ WithObservedGeneration(1), withPaused(true)),
}},
WantUpdates: []clientgotesting.UpdateActionImpl{{
Object: defaultProxySKS,
@@ -796,7 +803,7 @@ func TestReconcile(t *testing.T) {
Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, WithPASKSReady, WithPAMetricsService(privateSvc),
WithNoTraffic("TimedOut", "The target could not be activated."), withScales(1, 0),
WithPAStatusService(testRevision), WithPAMetricsService(privateSvc),
- WithObservedGeneration(1)),
+ WithObservedGeneration(1), withPaused(true)),
}},
WantUpdates: []clientgotesting.UpdateActionImpl{{
Object: defaultProxySKS,
@@ -862,7 +869,7 @@ func TestReconcile(t *testing.T) {
Ctx: context.WithValue(context.Background(), deciderKey{},
decider(testNamespace, testRevision, defaultScale, 0 /* ebc */)),
Objects: append([]runtime.Object{
- activeKPAMinScale(underscale, defaultScale), underscaledDeployment,
+ activeKPAMinScale(underscale, defaultScale, false), underscaledDeployment,
defaultSKS, defaultMetric,
}, underscaledReady...),
WantPatches: []clientgotesting.PatchActionImpl{
@@ -889,7 +896,7 @@ func TestReconcile(t *testing.T) {
minScalePatch,
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
- Object: activeKPAMinScale(overscale, defaultScale),
+ Object: activeKPAMinScale(overscale, defaultScale, false),
}},
}, {
Name: "overscaled, PA activating",
@@ -909,7 +916,7 @@ func TestReconcile(t *testing.T) {
minScalePatch,
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
- Object: activeKPAMinScale(overscale, defaultScale),
+ Object: activeKPAMinScale(overscale, defaultScale, false),
}},
}, {
Name: "over maxScale for real, PA active",
@@ -919,7 +926,7 @@ func TestReconcile(t *testing.T) {
decider(testNamespace, testRevision, overscale, /*want more than minScale*/
0 /* ebc */)),
Objects: append([]runtime.Object{
- activeKPAMinScale(overscale, overscale), overscaledDeployment,
+ activeKPAMinScale(overscale, overscale, false), overscaledDeployment,
sks(testNamespace, testRevision, WithDeployRef(deployName), WithSKSReady, WithNumActivators(scaledAct)),
defaultMetric,
}, overscaledReady...),
@@ -931,7 +938,7 @@ func TestReconcile(t *testing.T) {
decider(testNamespace, testRevision, 1, /*less than minScale*/
0 /* ebc */)),
Objects: append([]runtime.Object{
- activeKPAMinScale(overscale, overscale), overscaledDeployment,
+ activeKPAMinScale(overscale, overscale, false), overscaledDeployment,
sks(testNamespace, testRevision, WithDeployRef(deployName), WithSKSReady, WithNumActivators(scaledAct)),
defaultMetric,
}, overscaledReady...),
@@ -939,7 +946,7 @@ func TestReconcile(t *testing.T) {
minScalePatch,
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
- Object: activeKPAMinScale(overscale, defaultScale),
+ Object: activeKPAMinScale(overscale, defaultScale, false),
}},
}, {
Name: "scaled-to-0-no-scale-data",
@@ -950,7 +957,7 @@ func TestReconcile(t *testing.T) {
Objects: []runtime.Object{
kpa(testNamespace, testRevision, WithScaleTargetInitialized, WithPASKSReady,
WithNoTraffic(noTrafficReason, "The target is not receiving traffic."), WithPAMetricsService(privateSvc),
- withScales(0, -1), WithPAStatusService(testRevision), WithObservedGeneration(1)),
+ withScales(0, -1), WithPAStatusService(testRevision), WithObservedGeneration(1), withPaused(true)),
sks(testNamespace, testRevision, WithDeployRef(deployName),
WithProxyMode, WithSKSReady),
metric(testNamespace, testRevision),
@@ -965,7 +972,7 @@ func TestReconcile(t *testing.T) {
Objects: []runtime.Object{
kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic,
markScaleTargetInitialized, WithPAMetricsService(privateSvc),
- withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)),
+ withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1), withPaused(true)),
defaultProxySKS,
metric(testNamespace, testRevision),
defaultDeployment, defaultReady,
@@ -989,6 +996,11 @@ func TestReconcile(t *testing.T) {
Object: sks(testNamespace, testRevision, WithSKSReady,
WithDeployRef(deployName), WithProxyMode, WithNumActivators(minActivators+1)),
}},
+ WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
+ Object: kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized,
+ WithPAMetricsService(privateSvc), withScales(1, defaultScale),
+ WithPAStatusService(testRevision), WithObservedGeneration(1), withPaused(true)),
+ }},
}, {
Name: "traffic decreased, now we have enough burst capacity",
Key: key,
@@ -998,7 +1010,7 @@ func TestReconcile(t *testing.T) {
Objects: []runtime.Object{
kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized,
WithPAMetricsService(privateSvc), withScales(1, defaultScale),
- WithPAStatusService(testRevision), WithObservedGeneration(1)),
+ WithPAStatusService(testRevision), WithObservedGeneration(1), withPaused(true)),
defaultProxySKS,
metric(testNamespace, testRevision),
defaultDeployment,
@@ -1007,6 +1019,11 @@ func TestReconcile(t *testing.T) {
WantUpdates: []clientgotesting.UpdateActionImpl{{
Object: defaultSKS,
}},
+ WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
+ Object: kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized,
+ WithPAMetricsService(privateSvc), withScales(1, defaultScale),
+ WithPAStatusService(testRevision), WithObservedGeneration(1)),
+ }},
}, {
Name: "initial scale > minScale, have not reached initial scale, PA still activating",
Key: key,
@@ -1016,7 +1033,8 @@ func TestReconcile(t *testing.T) {
Objects: append([]runtime.Object{
kpa(testNamespace, testRevision, WithPASKSReady, WithBufferedTraffic,
withScales(defaultScale, defaultScale), WithReachabilityReachable,
- withMinScale(defaultScale), withInitialScale(20), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc)),
+ withMinScale(defaultScale), withInitialScale(20), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc),
+ withPaused(true)),
sks(testNamespace, testRevision, WithDeployRef(deployName), WithProxyMode, WithSKSReady, WithNumActivators(scaledAct)),
defaultMetric, defaultDeployment,
}, makeReadyPods(defaultScale, testNamespace, testRevision)...),
@@ -1024,7 +1042,7 @@ func TestReconcile(t *testing.T) {
Object: kpa(testNamespace, testRevision, WithPASKSReady, WithBufferedTraffic,
withScales(defaultScale, 20), WithReachabilityReachable,
withMinScale(defaultScale), withInitialScale(20), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc),
- WithObservedGeneration(1),
+ WithObservedGeneration(1), withPaused(true),
),
}},
WantPatches: []clientgotesting.PatchActionImpl{{
@@ -1041,7 +1059,7 @@ func TestReconcile(t *testing.T) {
Objects: append([]runtime.Object{
kpa(testNamespace, testRevision, WithPASKSReady, WithBufferedTraffic,
withScales(20, defaultScale), withInitialScale(20), WithReachabilityReachable,
- WithPAStatusService(testRevision), WithPAMetricsService(privateSvc),
+ WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), withPaused(true),
),
sks(testNamespace, testRevision, WithDeployRef(deployName), WithProxyMode, WithSKSReady, WithNumActivators(scaledAct)),
defaultMetric,
@@ -1052,7 +1070,7 @@ func TestReconcile(t *testing.T) {
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic,
markScaleTargetInitialized, withScales(20, 20), withInitialScale(20), WithReachabilityReachable,
- WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1),
+ WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1), withPaused(true),
),
}},
WantPatches: []clientgotesting.PatchActionImpl{{
@@ -1149,7 +1167,7 @@ func TestReconcile(t *testing.T) {
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: kpa(testNamespace, testRevision, WithTraffic, WithPASKSReady, markScaleTargetInitialized,
withScales(2, 2), WithReachabilityReachable, WithPAStatusService(testRevision),
- WithPAMetricsService(privateSvc), WithObservedGeneration(1),
+ WithPAMetricsService(privateSvc), WithObservedGeneration(1), withPaused(true),
),
}},
}}
diff --git a/pkg/reconciler/metric/controller.go b/pkg/reconciler/metric/controller.go
index 5021b03188d2..6c2d7dcb7b28 100644
--- a/pkg/reconciler/metric/controller.go
+++ b/pkg/reconciler/metric/controller.go
@@ -19,12 +19,15 @@ package metric
import (
"context"
+ "k8s.io/client-go/tools/cache"
+ "knative.dev/pkg/configmap"
+ "knative.dev/pkg/controller"
+ pkgreconciler "knative.dev/pkg/reconciler"
+ "knative.dev/serving/pkg/apis/autoscaling"
"knative.dev/serving/pkg/autoscaler/metrics"
metricinformer "knative.dev/serving/pkg/client/injection/informers/autoscaling/v1alpha1/metric"
+ painformer "knative.dev/serving/pkg/client/injection/informers/autoscaling/v1alpha1/podautoscaler"
metricreconciler "knative.dev/serving/pkg/client/injection/reconciler/autoscaling/v1alpha1/metric"
-
- "knative.dev/pkg/configmap"
- "knative.dev/pkg/controller"
)
// NewController initializes the controller and is called by the generated code.
@@ -35,15 +38,24 @@ func NewController(
collector metrics.Collector,
) *controller.Impl {
metricInformer := metricinformer.Get(ctx)
-
+ paInformer := painformer.Get(ctx)
c := &reconciler{
collector: collector,
+ paLister: paInformer.Lister(),
}
impl := metricreconciler.NewImpl(ctx, c)
// Watch all the Metric objects.
metricInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
+ // watch pod autoscaler for updates incase of pausing
+ onlyKPAClass := pkgreconciler.AnnotationFilterFunc(
+ autoscaling.ClassAnnotationKey, autoscaling.KPA, false /*allowUnset*/)
+ paInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
+ FilterFunc: onlyKPAClass,
+ Handler: controller.HandleAll(impl.Enqueue),
+ })
+
collector.Watch(impl.EnqueueKey)
return impl
diff --git a/pkg/reconciler/metric/metric.go b/pkg/reconciler/metric/metric.go
index bb98d120c416..12cb4fe7b1d8 100644
--- a/pkg/reconciler/metric/metric.go
+++ b/pkg/reconciler/metric/metric.go
@@ -21,16 +21,18 @@ import (
"errors"
"k8s.io/apimachinery/pkg/types"
+ pkgreconciler "knative.dev/pkg/reconciler"
autoscalingv1alpha1 "knative.dev/serving/pkg/apis/autoscaling/v1alpha1"
+ "knative.dev/serving/pkg/apis/serving"
"knative.dev/serving/pkg/autoscaler/metrics"
-
- pkgreconciler "knative.dev/pkg/reconciler"
metricreconciler "knative.dev/serving/pkg/client/injection/reconciler/autoscaling/v1alpha1/metric"
+ palisters "knative.dev/serving/pkg/client/listers/autoscaling/v1alpha1"
)
// reconciler implements controller.Reconciler for Metric resources.
type reconciler struct {
collector metrics.Collector
+ paLister palisters.PodAutoscalerLister
}
// Check that our Reconciler implements the necessary interfaces.
@@ -40,7 +42,22 @@ var (
)
// ReconcileKind implements Interface.ReconcileKind.
-func (r *reconciler) ReconcileKind(_ context.Context, metric *autoscalingv1alpha1.Metric) pkgreconciler.Event {
+func (r *reconciler) ReconcileKind(ctx context.Context, metric *autoscalingv1alpha1.Metric) pkgreconciler.Event {
+ // Check if autoscaler in proxy or serving mode, if so pause or resume if needed
+ revisionName := metric.Labels[serving.RevisionLabelKey]
+ if revisionName != "" {
+ paName := revisionName
+ pa, err := r.paLister.PodAutoscalers(metric.Namespace).Get(paName)
+ if err == nil && pa.Status.MetricsPaused {
+ r.collector.Pause(metric)
+ } else {
+ r.collector.Resume(metric)
+ }
+ } else {
+ // unpause metrics to be safe
+ r.collector.Resume(metric)
+ }
+
if err := r.collector.CreateOrUpdate(metric); err != nil {
switch {
case errors.Is(err, metrics.ErrFailedGetEndpoints):
diff --git a/pkg/reconciler/metric/metric_test.go b/pkg/reconciler/metric/metric_test.go
index 147071ad7bb5..43b6d03e2d8c 100644
--- a/pkg/reconciler/metric/metric_test.go
+++ b/pkg/reconciler/metric/metric_test.go
@@ -19,6 +19,7 @@ package metric
import (
"context"
"errors"
+ "fmt"
"sync/atomic"
"testing"
"time"
@@ -36,21 +37,38 @@ import (
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
+ "knative.dev/serving/pkg/apis/autoscaling"
autoscalingv1alpha1 "knative.dev/serving/pkg/apis/autoscaling/v1alpha1"
+ "knative.dev/serving/pkg/apis/serving"
+ v1 "knative.dev/serving/pkg/apis/serving/v1"
"knative.dev/serving/pkg/autoscaler/metrics"
servingclient "knative.dev/serving/pkg/client/injection/client/fake"
metricreconciler "knative.dev/serving/pkg/client/injection/reconciler/autoscaling/v1alpha1/metric"
- _ "knative.dev/serving/pkg/client/injection/informers/autoscaling/v1alpha1/metric/fake"
-
+ filteredinformerfactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
+ _ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"
. "knative.dev/pkg/reconciler/testing"
+ revisionresources "knative.dev/serving/pkg/reconciler/revision/resources"
+
+ metricinformer "knative.dev/serving/pkg/client/injection/informers/autoscaling/v1alpha1/metric/fake"
+ autoscalerinformer "knative.dev/serving/pkg/client/injection/informers/autoscaling/v1alpha1/podautoscaler/fake"
. "knative.dev/serving/pkg/reconciler/testing/v1"
)
type collectorKey struct{}
func TestNewController(t *testing.T) {
- ctx, _ := SetupFakeContext(t)
+ ctx, cancel, infs := SetupFakeContextWithCancel(t, func(ctx context.Context) context.Context {
+ return filteredinformerfactory.WithSelectors(ctx, serving.RevisionUID)
+ })
+ waitInformers, err := RunAndSyncInformers(ctx, infs...)
+ if err != nil {
+ t.Fatal("Error starting up informers:", err)
+ }
+ defer func() {
+ cancel()
+ waitInformers()
+ }()
c := NewController(ctx, configmap.NewStaticWatcher(), &testCollector{})
if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
@@ -172,8 +190,10 @@ func TestReconcile(t *testing.T) {
if c := ctx.Value(collectorKey{}); c != nil {
col = c.(*testCollector)
}
+
r := &reconciler{
collector: col,
+ paLister: listers.GetPodAutoscalerLister(),
}
return metricreconciler.NewReconciler(ctx, logging.FromContext(ctx),
@@ -183,7 +203,9 @@ func TestReconcile(t *testing.T) {
}
func TestReconcileWithCollector(t *testing.T) {
- ctx, cancel, informers := SetupFakeContextWithCancel(t)
+ ctx, cancel, informers := SetupFakeContextWithCancel(t, func(ctx context.Context) context.Context {
+ return filteredinformerfactory.WithSelectors(ctx, serving.RevisionUID)
+ })
collector := &testCollector{}
@@ -206,8 +228,11 @@ func TestReconcileWithCollector(t *testing.T) {
})
m := metric("a-new", "test-metric")
+ a := kpa(m.Namespace, m.Name)
+
scs := servingclient.Get(ctx)
+ scs.AutoscalingV1alpha1().PodAutoscalers(a.Namespace).Create(ctx, a, metav1.CreateOptions{})
scs.AutoscalingV1alpha1().Metrics(m.Namespace).Create(ctx, m, metav1.CreateOptions{})
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
@@ -224,6 +249,68 @@ func TestReconcileWithCollector(t *testing.T) {
}
}
+func TestReconcilePaused(t *testing.T) {
+ ctx, cancel, informers := SetupFakeContextWithCancel(t, func(ctx context.Context) context.Context {
+ return filteredinformerfactory.WithSelectors(ctx, serving.RevisionUID)
+ })
+
+ collector := &testCollector{}
+
+ ctl := NewController(ctx, configmap.NewStaticWatcher(), collector)
+ wf, err := RunAndSyncInformers(ctx, informers...)
+ if err != nil {
+ cancel()
+ t.Fatal("RunAndSyncInformers() =", err)
+ }
+
+ var eg errgroup.Group
+ defer func() {
+ cancel()
+ wf()
+ eg.Wait()
+ }()
+
+ eg.Go(func() error {
+ return ctl.RunContext(ctx, 1)
+ })
+
+ m := metric("a-new", "test-metric")
+ a := kpa(m.Namespace, m.Name)
+ scs := servingclient.Get(ctx)
+
+ scs.AutoscalingV1alpha1().Metrics(m.Namespace).Create(ctx, m, metav1.CreateOptions{})
+ metricinformer.Get(ctx).Informer().GetIndexer().Add(m)
+
+ scs.AutoscalingV1alpha1().PodAutoscalers(a.Namespace).Create(ctx, a, metav1.CreateOptions{})
+ autoscalerinformer.Get(ctx).Informer().GetIndexer().Add(a)
+
+ if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 1*time.Second, false, func(context.Context) (bool, error) {
+ return !collector.paused.Load(), nil
+ }); err != nil {
+ t.Fatal("collector is paused, should not be paused")
+ }
+
+ // pause metrics
+ a.Status.MetricsPaused = true
+ updatePodAutoscaler(t, ctx, a)
+
+ if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 1*time.Second, false, func(context.Context) (bool, error) {
+ return collector.paused.Load(), nil
+ }); err != nil {
+ t.Fatal("collector is not paused, should be paused")
+ }
+
+ // test when empty revision label found
+ m.Labels[serving.RevisionLabelKey] = ""
+ updateMetric(t, ctx, m)
+
+ if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 1*time.Second, false, func(context.Context) (bool, error) {
+ return !collector.paused.Load(), nil
+ }); err != nil {
+ t.Fatal("collector is paused, should not be paused")
+ }
+}
+
type metricOption func(*autoscalingv1alpha1.Metric)
func failed(r, m string) metricOption {
@@ -242,11 +329,24 @@ func ready(m *autoscalingv1alpha1.Metric) {
m.Status.MarkMetricReady()
}
+func updateMetric(
+ t *testing.T,
+ ctx context.Context,
+ m *autoscalingv1alpha1.Metric,
+) {
+ t.Helper()
+ servingclient.Get(ctx).AutoscalingV1alpha1().Metrics(m.Namespace).Update(ctx, m, metav1.UpdateOptions{})
+ metricinformer.Get(ctx).Informer().GetIndexer().Update(m)
+}
+
func metric(namespace, name string, opts ...metricOption) *autoscalingv1alpha1.Metric {
m := &autoscalingv1alpha1.Metric{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
+ Labels: map[string]string{
+ serving.RevisionLabelKey: name,
+ },
},
Spec: autoscalingv1alpha1.MetricSpec{
// Doesn't really matter what is by default, but we need something, so that
@@ -260,12 +360,52 @@ func metric(namespace, name string, opts ...metricOption) *autoscalingv1alpha1.M
return m
}
+func newTestRevision(namespace, name string) *v1.Revision {
+ return &v1.Revision{
+ ObjectMeta: metav1.ObjectMeta{
+ SelfLink: fmt.Sprintf("/apis/ela/v1/namespaces/%s/revisions/%s", namespace, name),
+ Name: name,
+ Namespace: namespace,
+ Annotations: map[string]string{
+ autoscaling.ClassAnnotationKey: autoscaling.KPA,
+ },
+ Labels: map[string]string{
+ serving.ServiceLabelKey: "test-service",
+ serving.ConfigurationLabelKey: "test-service",
+ },
+ },
+ Spec: v1.RevisionSpec{},
+ }
+}
+
+func updatePodAutoscaler(
+ t *testing.T,
+ ctx context.Context,
+ pa *autoscalingv1alpha1.PodAutoscaler,
+) {
+ t.Helper()
+ servingclient.Get(ctx).AutoscalingV1alpha1().PodAutoscalers(pa.Namespace).Update(ctx, pa, metav1.UpdateOptions{})
+ autoscalerinformer.Get(ctx).Informer().GetIndexer().Update(pa)
+}
+
+func kpa(ns, n string) *autoscalingv1alpha1.PodAutoscaler {
+ rev := newTestRevision(ns, n)
+ kpa := revisionresources.MakePA(rev, nil)
+ kpa.Generation = 1
+ kpa.Annotations[autoscaling.ClassAnnotationKey] = "kpa.autoscaling.knative.dev"
+ kpa.Annotations[autoscaling.MetricAnnotationKey] = "concurrency"
+ kpa.Status.MetricsPaused = false
+ kpa.Status.InitializeConditions()
+ return kpa
+}
+
type testCollector struct {
metrics.Collector
createOrUpdateCalls atomic.Int32
createOrUpdateError error
deleteCalls atomic.Int32
+ paused atomic.Bool
}
func (c *testCollector) CreateOrUpdate(metric *autoscalingv1alpha1.Metric) error {
@@ -278,3 +418,11 @@ func (c *testCollector) Delete(namespace, name string) {
}
func (c *testCollector) Watch(func(types.NamespacedName)) {}
+
+func (c *testCollector) Pause(metric *autoscalingv1alpha1.Metric) {
+ c.paused.Store(true)
+}
+
+func (c *testCollector) Resume(metric *autoscalingv1alpha1.Metric) {
+ c.paused.Store(false)
+}