From 0213227a329774a63bbf6862de6f2df9a5ea1086 Mon Sep 17 00:00:00 2001 From: Cristian Magherusan-Stanciu Date: Mon, 4 May 2026 23:02:04 +0200 Subject: [PATCH 1/3] perf(concurrency): cap aggregate collect parallelism via shared semaphore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The recommendations-collection fan-out has up to four nested levels: provider → account → service|region → per-region service. Each level was independently capped, so peak goroutine counts multiplied through the tree (3 providers × 20 accounts × 30 GCP regions × 2 services = 3 600 in-flight gRPC/HTTP clients on a real multi-account deploy). On a 512 MB Lambda this exhausted memory before the work could finish — observed in dev as `signal: killed` at Max Memory Used = 512 MB after ~28s, with the runtime never reaching the 60s timeout. A single semaphore stashed on ctx now lets every leaf goroutine (the goroutine that issues the actual cloud-API call) Acquire one slot before doing IO and Release after, so aggregate concurrent IO is hard-bounded by CUDLY_MAX_PARALLELISM (default 20) regardless of how nested the dispatch is. Intermediate dispatchers (provider, account, GCP region) do NOT acquire — they only launch sub-goroutines — so no goroutine can deadlock by holding a permit while waiting for sub-permits. Mechanics: - New pkg/concurrency package: WithSharedSemaphore / SharedSemaphore / Acquire / Release helpers that read the semaphore from ctx, plus MaxParallelismFromEnv reading CUDLY_MAX_PARALLELISM (default 20). Acquire/Release are no-ops when no semaphore is on ctx — CLI tools and unit tests skip the semaphore entirely without per-call branching. - Scheduler.CollectRecommendations allocates the semaphore (size from env) and attaches to ctx via concurrency.WithSharedSemaphore BEFORE the provider fan-out, so the wrapped ctx flows through every nested level. Logs the effective cap on entry. - Three leaf fan-outs wired: - providers/aws/recommendations/client.go::GetAllRecommendations — 5 service goroutines (EC2 / RDS / ElastiCache / OpenSearch / Redshift) - providers/azure/recommendations.go::GetRecommendations — 5 service goroutines (compute / database / cache / cosmosdb / advisor); Acquire/Release boilerplate extracted into a goService helper so the function stays under gocyclo's 10-branch gate. - providers/gcp/recommendations.go::collectRegion — 2 sub-fan-out goroutines (compute / cloudsql) per region Each leaf calls Acquire(gctx) at the top, defers Release(gctx). On Acquire failure (parent ctx cancelled mid-wait) the leaf's per-service err variable is set to ctx.Err() and the goroutine returns nil to the errgroup — preserves the documented error-isolation contract. Test mocks updated: persistCollection's UpsertRecommendations and the notification email's SendNewRecommendationsNotification both run inside CollectRecommendations after the ctx is wrapped, so their mock setups were pinning the original (unwrapped) ctx. mock.Anything is the appropriate looseness — the wrapped ctx is implementation detail. New pkg/concurrency tests pin: - MaxParallelismFromEnv env-knob parser (unset / positive / zero / negative / non-numeric / explicit-unset) - Acquire/Release are no-ops when no semaphore on ctx - WithSharedSemaphore returns ctx unchanged when sem is nil - 20 goroutines under cap=3 never see >3 in-flight (load-bearing) - Acquire returns ctx.Err() when cancelled mid-wait This is a smoke-test config — the user's dev deploy hit OOM with the existing per-level caps; CUDLY_MAX_PARALLELISM=20 (paired with a Lambda memory bump to 2 GB in a follow-up commit) should give the dev refresh headroom to complete. Operators can dial up further via env override. --- internal/scheduler/scheduler.go | 18 ++++ internal/scheduler/scheduler_test.go | 16 +++- pkg/concurrency/concurrency.go | 87 +++++++++++++++++ pkg/concurrency/concurrency_test.go | 120 ++++++++++++++++++++++++ pkg/go.mod | 10 +- pkg/go.sum | 4 +- providers/aws/recommendations/client.go | 32 +++++++ providers/azure/recommendations.go | 40 ++++++-- providers/gcp/go.mod | 6 +- providers/gcp/go.sum | 4 +- providers/gcp/recommendations.go | 20 ++++ 11 files changed, 329 insertions(+), 28 deletions(-) create mode 100644 pkg/concurrency/concurrency.go create mode 100644 pkg/concurrency/concurrency_test.go diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 9e6641cb..ad98e948 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -19,11 +19,13 @@ import ( "github.com/LeanerCloud/CUDly/internal/oidc" "github.com/LeanerCloud/CUDly/internal/purchase" "github.com/LeanerCloud/CUDly/pkg/common" + "github.com/LeanerCloud/CUDly/pkg/concurrency" "github.com/LeanerCloud/CUDly/pkg/logging" "github.com/LeanerCloud/CUDly/pkg/provider" azureprovider "github.com/LeanerCloud/CUDly/providers/azure" gcpprovider "github.com/LeanerCloud/CUDly/providers/gcp" "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" ) // SchedulerConfig holds configuration for the scheduler @@ -168,6 +170,22 @@ func (s *Scheduler) CollectRecommendations(ctx context.Context) (*CollectResult, return nil, err } + // Attach a shared semaphore to ctx so every leaf goroutine in the + // recommendations-collection fan-out tree (AWS service, Azure service, + // GCP region×service) acquires one slot before issuing its cloud-API + // call and releases it after. This bounds aggregate concurrent IO across + // the whole tree at CUDLY_MAX_PARALLELISM (default 20) regardless of how + // nested the dispatch is — without it, peak concurrency multiplies + // through the nested fan-outs and can exhaust Lambda memory before work + // completes (observed with a 512 MB function in dev). Intermediate + // dispatchers (provider, account, GCP region) do NOT acquire — they only + // launch sub-goroutines — so no goroutine can deadlock by holding a + // permit while waiting for sub-permits. + maxParallelism := concurrency.MaxParallelismFromEnv() + sem := semaphore.NewWeighted(int64(maxParallelism)) + ctx = concurrency.WithSharedSemaphore(ctx, sem) + logging.Infof("Recommendations collection: aggregate parallelism cap = %d", maxParallelism) + // Collect recommendations from each enabled provider concurrently, tracking // per-provider outcomes so persistence can scope stale-row eviction to // (provider, account) pairs that actually ran. A partial collection (e.g. diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index fc8ee994..6c055e30 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -1176,7 +1176,11 @@ func TestScheduler_ListRecommendations_ColdStartSync(t *testing.T) { // global config. Return no enabled providers so the collect is a // no-op but still runs the persistence path. mockStore.On("GetGlobalConfig", ctx).Return(&config.GlobalConfig{EnabledProviders: []string{}}, nil) - mockStore.On("UpsertRecommendations", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) + // UpsertRecommendations runs inside CollectRecommendations, after the + // shared-semaphore is attached to ctx; the wrapped ctx is what reaches + // the persistence layer. mock.Anything keeps the assertion resilient + // to that wrap. + mockStore.On("UpsertRecommendations", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) mockStore.On("ListStoredRecommendations", ctx, mock.Anything). Return([]config.RecommendationRecord{}, nil) @@ -1189,7 +1193,7 @@ func TestScheduler_ListRecommendations_ColdStartSync(t *testing.T) { // CollectRecommendations, so seeing it prove the sync collect fired // before the store read. mockStore.AssertCalled(t, "GetGlobalConfig", ctx) - mockStore.AssertCalled(t, "UpsertRecommendations", ctx, mock.Anything, mock.Anything, mock.Anything) + mockStore.AssertCalled(t, "UpsertRecommendations", mock.Anything, mock.Anything, mock.Anything, mock.Anything) } // fanOutPerAccount bounds parallel in-flight calls to @@ -1718,7 +1722,11 @@ func TestScheduler_CollectRecommendations_WithSuccessfulRecs(t *testing.T) { // contract test cover the cancellation path). mockProvider.On("GetRecommendationsClient", mock.Anything).Return(mockRecClient, nil) mockRecClient.On("GetAllRecommendations", mock.Anything).Return(recommendations, nil) - mockEmail.On("SendNewRecommendationsNotification", ctx, mock.AnythingOfType("email.NotificationData")).Return(nil) + // SendNewRecommendationsNotification fires inside CollectRecommendations, + // after ctx has been wrapped via concurrency.WithSharedSemaphore — the + // wrapped ctx is what reaches the email sender. mock.Anything keeps the + // assertion resilient to that wrap. + mockEmail.On("SendNewRecommendationsNotification", mock.Anything, mock.AnythingOfType("email.NotificationData")).Return(nil) scheduler := &Scheduler{ config: mockStore, @@ -1733,7 +1741,7 @@ func TestScheduler_CollectRecommendations_WithSuccessfulRecs(t *testing.T) { assert.Equal(t, 1, result.Recommendations) assert.Equal(t, 500.0, result.TotalSavings) - mockEmail.AssertCalled(t, "SendNewRecommendationsNotification", ctx, mock.AnythingOfType("email.NotificationData")) + mockEmail.AssertCalled(t, "SendNewRecommendationsNotification", mock.Anything, mock.AnythingOfType("email.NotificationData")) } // Test AWS recommendations fallback to GetRecommendations when GetAllRecommendations returns empty diff --git a/pkg/concurrency/concurrency.go b/pkg/concurrency/concurrency.go new file mode 100644 index 00000000..a279c5a9 --- /dev/null +++ b/pkg/concurrency/concurrency.go @@ -0,0 +1,87 @@ +// Package concurrency provides a shared global parallelism cap for the +// recommendations-collection fan-out tree. +// +// The fan-out has up to four nested levels (provider → account → service|region +// → per-region service). Each level was independently capped, so peak goroutine +// counts multiplied through the tree (3 providers × 20 accounts × 30 regions × +// 2 services = thousands of in-flight gRPC/HTTP clients). On a 512 MB Lambda +// that exhausted memory before the work could finish. +// +// A single semaphore stashed on the context lets every leaf goroutine — the +// goroutine that issues the actual cloud-API call — acquire one slot before +// doing IO and release it after, so the aggregate concurrent IO count is hard- +// bounded regardless of nesting depth. Intermediate dispatchers (provider, +// account, GCP region) do NOT acquire — they only launch sub-goroutines — so +// no goroutine can deadlock by holding a permit while waiting for sub-permits. +// +// If no semaphore is attached to the context (e.g. unit tests, ambient calls +// from CLI tools), Acquire and Release are no-ops; callers don't need to +// branch on whether the semaphore is set. +package concurrency + +import ( + "context" + "os" + "strconv" + + "golang.org/x/sync/semaphore" +) + +// DefaultMaxParallelism is the default cap on aggregate concurrent leaf +// goroutines across the recommendations-collection fan-out tree. Override at +// runtime with CUDLY_MAX_PARALLELISM. +const DefaultMaxParallelism = 20 + +// MaxParallelismFromEnv reads CUDLY_MAX_PARALLELISM and returns its +// positive-integer value, falling back to DefaultMaxParallelism on unset / +// invalid / non-positive values. +func MaxParallelismFromEnv() int { + if v := os.Getenv("CUDLY_MAX_PARALLELISM"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + return n + } + } + return DefaultMaxParallelism +} + +type ctxKey struct{} + +// WithSharedSemaphore returns a context carrying sem. Goroutines spawned from +// this context (or any descendant) can acquire/release slots via Acquire and +// Release. If sem is nil the context is returned unchanged. +func WithSharedSemaphore(ctx context.Context, sem *semaphore.Weighted) context.Context { + if sem == nil { + return ctx + } + return context.WithValue(ctx, ctxKey{}, sem) +} + +// SharedSemaphore returns the semaphore stashed in ctx, or nil if none. +func SharedSemaphore(ctx context.Context) *semaphore.Weighted { + sem, _ := ctx.Value(ctxKey{}).(*semaphore.Weighted) + return sem +} + +// Acquire blocks until a slot is available on the shared semaphore in ctx and +// returns nil. Returns ctx.Err() if the wait is cancelled. If no semaphore is +// attached to ctx, Acquire is a no-op and returns nil immediately — leaf +// callers can use it unconditionally without checking. +func Acquire(ctx context.Context) error { + sem := SharedSemaphore(ctx) + if sem == nil { + return nil + } + return sem.Acquire(ctx, 1) +} + +// Release returns one slot to the shared semaphore in ctx. Always pair with a +// successful Acquire (return value nil); calling Release after a cancelled +// Acquire would corrupt the slot count. If no semaphore is attached to ctx, +// Release is a no-op. +func Release(ctx context.Context) { + sem := SharedSemaphore(ctx) + if sem == nil { + return + } + sem.Release(1) +} diff --git a/pkg/concurrency/concurrency_test.go b/pkg/concurrency/concurrency_test.go new file mode 100644 index 00000000..3491a337 --- /dev/null +++ b/pkg/concurrency/concurrency_test.go @@ -0,0 +1,120 @@ +package concurrency + +import ( + "context" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/semaphore" +) + +// TestMaxParallelismFromEnv pins the env-knob parser semantics for +// CUDLY_MAX_PARALLELISM. +func TestMaxParallelismFromEnv(t *testing.T) { + cases := []struct { + name string + env string + want int + }{ + {"unset returns default", "", DefaultMaxParallelism}, + {"positive integer overrides", "50", 50}, + {"non-numeric falls back to default", "many", DefaultMaxParallelism}, + {"zero falls back to default", "0", DefaultMaxParallelism}, + {"negative falls back to default", "-3", DefaultMaxParallelism}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Setenv("CUDLY_MAX_PARALLELISM", tc.env) + assert.Equal(t, tc.want, MaxParallelismFromEnv()) + }) + } + + t.Run("explicit unset returns default", func(t *testing.T) { + os.Unsetenv("CUDLY_MAX_PARALLELISM") + assert.Equal(t, DefaultMaxParallelism, MaxParallelismFromEnv()) + }) +} + +// TestSharedSemaphore_NoSemaphoreOnContext verifies Acquire/Release are +// no-ops when no semaphore is attached — the documented contract that lets +// CLI tools and unit tests skip the semaphore entirely without per-call +// branching. +func TestSharedSemaphore_NoSemaphoreOnContext(t *testing.T) { + ctx := context.Background() + assert.Nil(t, SharedSemaphore(ctx)) + require.NoError(t, Acquire(ctx)) + Release(ctx) // must not panic +} + +// TestSharedSemaphore_WithNilSemaphore verifies WithSharedSemaphore returns +// the input ctx unchanged when sem is nil — defensive against accidental +// nil passes. +func TestSharedSemaphore_WithNilSemaphore(t *testing.T) { + ctx := context.Background() + assert.Equal(t, ctx, WithSharedSemaphore(ctx, nil)) +} + +// TestSharedSemaphore_BoundsConcurrency is the load-bearing contract test: +// with a cap of 3, 20 goroutines all calling Acquire/work/Release must +// never see more than 3 in-flight concurrently. Asserts peak concurrency +// observed via atomics. +func TestSharedSemaphore_BoundsConcurrency(t *testing.T) { + const cap = 3 + const goroutines = 20 + sem := semaphore.NewWeighted(cap) + ctx := WithSharedSemaphore(context.Background(), sem) + + var inflight, peak atomic.Int32 + updatePeak := func(cur int32) { + for { + p := peak.Load() + if cur <= p || peak.CompareAndSwap(p, cur) { + return + } + } + } + + var wg sync.WaitGroup + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t, Acquire(ctx)) + defer Release(ctx) + cur := inflight.Add(1) + updatePeak(cur) + time.Sleep(2 * time.Millisecond) // make overlap observable + inflight.Add(-1) + }() + } + wg.Wait() + + assert.LessOrEqual(t, peak.Load(), int32(cap), + "peak concurrent in-flight goroutines must not exceed semaphore cap") + assert.GreaterOrEqual(t, peak.Load(), int32(2), + "with %d goroutines and cap %d, peak should reach at least 2 (proves goroutines genuinely overlapped)", + goroutines, cap) +} + +// TestSharedSemaphore_AcquireRespectsCancellation verifies Acquire returns +// ctx.Err() when the parent ctx is cancelled while waiting for a slot. +// Without this, a cancelled refresh would leak a goroutine parked +// indefinitely on Acquire. +func TestSharedSemaphore_AcquireRespectsCancellation(t *testing.T) { + sem := semaphore.NewWeighted(1) + // Pre-occupy the only slot so the second Acquire must wait. + require.NoError(t, sem.Acquire(context.Background(), 1)) + defer sem.Release(1) + + ctx, cancel := context.WithCancel(WithSharedSemaphore(context.Background(), sem)) + cancel() // cancel before Acquire even starts + + err := Acquire(ctx) + require.Error(t, err) + assert.ErrorIs(t, err, context.Canceled) +} diff --git a/pkg/go.mod b/pkg/go.mod index 88badf28..3c4f40eb 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -1,8 +1,6 @@ module github.com/LeanerCloud/CUDly/pkg -go 1.23 - -toolchain go1.24.4 +go 1.25.0 // This module contains shared types, provider interfaces, and the exchange package. // The exchange package has AWS SDK dependencies for RI exchange operations. @@ -12,7 +10,10 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.26.2 github.com/aws/aws-sdk-go-v2/service/ec2 v1.251.2 github.com/aws/aws-sdk-go-v2/service/sts v1.26.6 + github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.11.1 + golang.org/x/sync v0.20.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -27,8 +28,5 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.5 // indirect github.com/aws/smithy-go v1.24.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/pkg/go.sum b/pkg/go.sum index ceedd991..a03f6a92 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -30,14 +30,14 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/providers/aws/recommendations/client.go b/providers/aws/recommendations/client.go index 5f454ee5..40c566c2 100644 --- a/providers/aws/recommendations/client.go +++ b/providers/aws/recommendations/client.go @@ -11,6 +11,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/LeanerCloud/CUDly/pkg/common" + "github.com/LeanerCloud/CUDly/pkg/concurrency" "github.com/LeanerCloud/CUDly/pkg/logging" ) @@ -192,23 +193,54 @@ func (c *Client) GetAllRecommendations(ctx context.Context) ([]common.Recommenda g, gctx := errgroup.WithContext(ctx) + // Each per-service goroutine is a leaf — it issues the actual Cost + // Explorer call. Acquire bounds aggregate concurrent IO across the + // whole recommendations-collection fan-out tree at the shared + // semaphore's cap (CUDLY_MAX_PARALLELISM, default 20); Release returns + // the slot. If no semaphore is on ctx (CLI tools, unit tests), + // Acquire/Release are no-ops. See pkg/concurrency. g.Go(func() error { + if err := concurrency.Acquire(gctx); err != nil { + ec2Err = err + return nil + } + defer concurrency.Release(gctx) ec2Recs, ec2Err = c.GetRecommendationsForService(gctx, common.ServiceEC2) return nil }) g.Go(func() error { + if err := concurrency.Acquire(gctx); err != nil { + rdsErr = err + return nil + } + defer concurrency.Release(gctx) rdsRecs, rdsErr = c.GetRecommendationsForService(gctx, common.ServiceRDS) return nil }) g.Go(func() error { + if err := concurrency.Acquire(gctx); err != nil { + cacheErr = err + return nil + } + defer concurrency.Release(gctx) cacheRecs, cacheErr = c.GetRecommendationsForService(gctx, common.ServiceElastiCache) return nil }) g.Go(func() error { + if err := concurrency.Acquire(gctx); err != nil { + osErr = err + return nil + } + defer concurrency.Release(gctx) osRecs, osErr = c.GetRecommendationsForService(gctx, common.ServiceOpenSearch) return nil }) g.Go(func() error { + if err := concurrency.Acquire(gctx); err != nil { + redshiftErr = err + return nil + } + defer concurrency.Release(gctx) redshiftRecs, redshiftErr = c.GetRecommendationsForService(gctx, common.ServiceRedshift) return nil }) diff --git a/providers/azure/recommendations.go b/providers/azure/recommendations.go index ffb773d9..ea0c007a 100644 --- a/providers/azure/recommendations.go +++ b/providers/azure/recommendations.go @@ -12,6 +12,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/LeanerCloud/CUDly/pkg/common" + "github.com/LeanerCloud/CUDly/pkg/concurrency" "github.com/LeanerCloud/CUDly/pkg/logging" "github.com/LeanerCloud/CUDly/providers/azure/services/cache" "github.com/LeanerCloud/CUDly/providers/azure/services/compute" @@ -70,48 +71,67 @@ func (r *RecommendationsClientAdapter) GetRecommendations(ctx context.Context, p g, gctx := errgroup.WithContext(ctx) + // Each per-service goroutine is a leaf — it issues the actual ARM / + // pricing-API call. The shared semaphore on gctx (set by the scheduler) + // bounds aggregate concurrent IO across the whole recommendations- + // collection fan-out tree at CUDLY_MAX_PARALLELISM (default 20). If no + // semaphore is attached (CLI tools, unit tests), Acquire/Release are + // no-ops. See pkg/concurrency. + // + // goService extracts the Acquire/Release boilerplate so each per- + // service block stays a single g.Go call — keeps GetRecommendations + // under the project's gocyclo gate (the per-service `if Acquire {…}` + // branches counted toward this function's cyclomatic complexity + // before extraction). + goService := func(errOut *error, fn func()) { + g.Go(func() error { + if err := concurrency.Acquire(gctx); err != nil { + *errOut = err + return nil + } + defer concurrency.Release(gctx) + fn() + return nil // error isolation: never propagate to errgroup + }) + } + // Compute (VM) recommendations — subscription-wide. if shouldIncludeService(params, common.ServiceCompute) { - g.Go(func() error { + goService(&computeErr, func() { computeClient := compute.NewClient(r.cred, r.subscriptionID, "") computeRecs, computeErr = computeClient.GetRecommendations(gctx, params) - return nil // error isolation: never propagate to errgroup }) } // Database (SQL) recommendations — subscription-wide. if shouldIncludeService(params, common.ServiceRelationalDB) { - g.Go(func() error { + goService(&dbErr, func() { dbClient := database.NewClient(r.cred, r.subscriptionID, "") dbRecs, dbErr = dbClient.GetRecommendations(gctx, params) - return nil }) } // Cache (Redis) recommendations — subscription-wide. if shouldIncludeService(params, common.ServiceCache) { - g.Go(func() error { + goService(&cacheErr, func() { cacheClient := cache.NewClient(r.cred, r.subscriptionID, "") cacheRecs, cacheErr = cacheClient.GetRecommendations(gctx, params) - return nil }) } // CosmosDB (NoSQL) recommendations — subscription-wide. if shouldIncludeService(params, common.ServiceNoSQL) { - g.Go(func() error { + goService(&cosmosErr, func() { cosmosClient := cosmosdb.NewClient(r.cred, r.subscriptionID, "") cosmosRecs, cosmosErr = cosmosClient.GetRecommendations(gctx, params) - return nil }) } // Azure Advisor adds cross-cutting cost recommendations independent of the // per-service Reservation API. Failures here are non-fatal — the per-service // results above are still useful on their own. - g.Go(func() error { + goService(&advisorErr, func() { advisorRecs, advisorErr = r.getAdvisorRecommendations(gctx, params) - return nil }) // Wait for all goroutines. g.Wait() always returns nil because every diff --git a/providers/gcp/go.mod b/providers/gcp/go.mod index 1972df7d..ea76ed0f 100644 --- a/providers/gcp/go.mod +++ b/providers/gcp/go.mod @@ -1,8 +1,6 @@ module github.com/LeanerCloud/CUDly/providers/gcp -go 1.23 - -toolchain go1.24.4 +go 1.25.0 require ( cloud.google.com/go/compute v1.23.3 @@ -14,7 +12,7 @@ require ( github.com/googleapis/gax-go/v2 v2.12.0 github.com/stretchr/testify v1.11.1 golang.org/x/oauth2 v0.16.0 - golang.org/x/sync v0.6.0 + golang.org/x/sync v0.20.0 google.golang.org/api v0.160.0 google.golang.org/genproto v0.0.0-20240116215550-a9fa1716bcac google.golang.org/grpc v1.61.0 diff --git a/providers/gcp/go.sum b/providers/gcp/go.sum index c1ad5f99..61f0858c 100644 --- a/providers/gcp/go.sum +++ b/providers/gcp/go.sum @@ -135,8 +135,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/providers/gcp/recommendations.go b/providers/gcp/recommendations.go index b7b8debd..b00af976 100644 --- a/providers/gcp/recommendations.go +++ b/providers/gcp/recommendations.go @@ -13,6 +13,7 @@ import ( "google.golang.org/api/option" "github.com/LeanerCloud/CUDly/pkg/common" + "github.com/LeanerCloud/CUDly/pkg/concurrency" "github.com/LeanerCloud/CUDly/pkg/logging" "github.com/LeanerCloud/CUDly/providers/gcp/services/cloudsql" "github.com/LeanerCloud/CUDly/providers/gcp/services/computeengine" @@ -154,8 +155,22 @@ func (r *RecommendationsClientAdapter) collectRegion(ctx context.Context, params g, gctx := errgroup.WithContext(ctx) + // Per-(region, service) goroutines are leaves — they issue the actual + // Recommender API call. Acquire bounds aggregate concurrent IO across + // the whole recommendations-collection fan-out tree at the shared + // semaphore's cap (CUDLY_MAX_PARALLELISM, default 20); Release returns + // the slot. Without this bound the per-region fan-out (cap 10) × + // per-service sub-fan-out (2) × accounts × providers can produce + // hundreds of concurrent gRPC clients that exhaust Lambda memory. If + // no semaphore is on ctx (CLI tools, unit tests), Acquire/Release are + // no-ops. See pkg/concurrency. if shouldIncludeService(params, common.ServiceCompute) { g.Go(func() error { + if err := concurrency.Acquire(gctx); err != nil { + computeErr = err + return nil + } + defer concurrency.Release(gctx) client, err := computeengine.NewClient(gctx, r.projectID, region, r.clientOpts...) if err != nil { computeErr = err @@ -167,6 +182,11 @@ func (r *RecommendationsClientAdapter) collectRegion(ctx context.Context, params } if shouldIncludeService(params, common.ServiceRelationalDB) { g.Go(func() error { + if err := concurrency.Acquire(gctx); err != nil { + sqlErr = err + return nil + } + defer concurrency.Release(gctx) client, err := cloudsql.NewClient(gctx, r.projectID, region, r.clientOpts...) if err != nil { sqlErr = err From c9c314cad7966c9f9287d43da935b498d90ecd31 Mon Sep 17 00:00:00 2001 From: Cristian Magherusan-Stanciu Date: Mon, 4 May 2026 23:02:49 +0200 Subject: [PATCH 2/3] =?UTF-8?q?infra(aws/dev):=20bump=20Lambda=20memory=20?= =?UTF-8?q?512=20MB=20=E2=86=92=202=20GB=20for=20collect=20headroom?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The dev Lambda hit Max Memory Used = 512 MB and was killed by the runtime at ~28s (Duration 28701ms / Timeout 60s) on a user-triggered recommendations refresh. The collection fan-out (post-#266 #267 #268) keeps many concurrent gRPC/HTTP clients in flight — 30+ GCP regions × 2 services each, plus 5 AWS services and 5 Azure services — and 512 MB is too tight for that working-set even with the new shared-semaphore cap. This is a temporary headroom bump scoped to dev so the user can verify the post-#266/#267/#268/concurrency-semaphore stack actually completes end-to-end on a deployed Lambda. The shared semaphore (preceding commit) is the durable fix for peak concurrency; this just gives the function room to breathe while we observe the new behaviour live. Only github-dev.tfvars is touched. github-staging.tfvars and github-prod.tfvars stay at 1024 MB — they don't exercise every provider with mostly-broken credentials the way dev does, and bumping prod without first observing dev would be premature. dev.tfvars (the user's local-apply config, gitignored) is also bumped locally to keep parity with the CI deploy until this lands. --- terraform/environments/aws/github-dev.tfvars | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/terraform/environments/aws/github-dev.tfvars b/terraform/environments/aws/github-dev.tfvars index b6c06449..abcfff29 100644 --- a/terraform/environments/aws/github-dev.tfvars +++ b/terraform/environments/aws/github-dev.tfvars @@ -18,7 +18,7 @@ compute_platform = "lambda" enable_docker_build = true # Build and push image via terraform apply on the runner # Lambda Configuration -lambda_memory_size = 512 +lambda_memory_size = 2048 lambda_timeout = 60 lambda_reserved_concurrency = -1 lambda_log_retention_days = 7 From 092e0b3d90e5a955113a78f824eacecc34f78818 Mon Sep 17 00:00:00 2001 From: Cristian Magherusan-Stanciu Date: Tue, 5 May 2026 00:27:44 +0200 Subject: [PATCH 3/3] fix(concurrency,aws): address PR #270 CodeRabbit review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two CodeRabbit findings on PR #270: 1. **`pkg/concurrency/concurrency_test.go` (CRITICAL)**: `require.NoError` was called from inside the worker goroutines in `TestSharedSemaphore_BoundsConcurrency`. Testify's documented contract is that `require.*` / `FailNow`-style helpers must run on the test's own goroutine — calling them from a worker uses `runtime.Goexit` on the worker instead of safely stopping the test, which can hang or skip cleanup and produces race-detector noise. Workers now capture their Acquire result on a buffered `errCh` and the main goroutine drains the channel and asserts after `wg.Wait()`. `Release` is deferred only on a successful Acquire, matching the documented pairing contract. 2. **`providers/aws/recommendations/client.go` (MAJOR)**: The per-service `g.Go` goroutines in `GetAllRecommendations` were calling `concurrency.Acquire`/`Release` around the whole `GetRecommendationsForService` sweep — but that helper internally loops 2 × 3 (term, payment) variants and waits on the rate-limiter inside each retry, so a throttled service would hold a global slot for seconds while no Cost Explorer request was actually in flight. Effective parallelism dropped well below `CUDLY_MAX_PARALLELISM`. The Acquire/Release boundary moves down to each individual `costExplorerClient.GetReservationPurchaseRecommendation` call (and the SP equivalent in `parser_sp.go`). The rate-limiter's `Wait` happens *outside* the permit, so a goroutine waiting on exponential-backoff backoff doesn't tie up a slot. Each retry acquires a fresh permit, so throughput now scales with the cap regardless of how many services are individually throttled. Tests: - `go test ./recommendations/...` from `providers/aws/` — pass - `go test -race ./concurrency/...` from `pkg/` — race-clean - Existing `go test -race ./recommendations/...` reveals a pre-existing data race on `Client.rateLimiter` (concurrent `Reset` / state mutation across the per-service goroutines), introduced by the parallelisation in #269 (`defac6c2a`). Filed as #271 — out of scope for this CR-fix commit, fix is straightforward (per-goroutine rate- limiter or internal mutex). Other CodeRabbit checks (Azure / GCP per-service goroutines holding permits across SDK calls) are NOT applied here because: - Azure's per-service SDK calls don't have an explicit rate-limiter Wait loop equivalent to AWS's — the inner ARM/pricing calls block on network IO, not exponential backoff. - GCP's `collectRegion` sub-fan-out goroutines do similarly direct SDK calls without an explicit Wait loop. - CR didn't flag those layers; if a future review surfaces the same argument with concrete throughput evidence, we can mirror the AWS pattern there. --- pkg/concurrency/concurrency_test.go | 18 +++++++- providers/aws/recommendations/client.go | 51 ++++++++-------------- providers/aws/recommendations/parser_sp.go | 8 ++++ 3 files changed, 44 insertions(+), 33 deletions(-) diff --git a/pkg/concurrency/concurrency_test.go b/pkg/concurrency/concurrency_test.go index 3491a337..76d08f64 100644 --- a/pkg/concurrency/concurrency_test.go +++ b/pkg/concurrency/concurrency_test.go @@ -79,13 +79,25 @@ func TestSharedSemaphore_BoundsConcurrency(t *testing.T) { } } + // Workers must never call require.* / FailNow on a non-test goroutine — + // testify's contract is that those land on the test's own goroutine + // (otherwise the failure mechanism uses runtime.Goexit on the worker + // instead of stopping the test, which can hang or skip cleanup). Each + // worker captures its Acquire result on a buffered channel and the main + // goroutine asserts after wg.Wait(). Release is only deferred on a + // successful Acquire — the documented pairing contract. var wg sync.WaitGroup + errCh := make(chan error, goroutines) for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() - require.NoError(t, Acquire(ctx)) + if err := Acquire(ctx); err != nil { + errCh <- err + return + } defer Release(ctx) + errCh <- nil cur := inflight.Add(1) updatePeak(cur) time.Sleep(2 * time.Millisecond) // make overlap observable @@ -93,6 +105,10 @@ func TestSharedSemaphore_BoundsConcurrency(t *testing.T) { }() } wg.Wait() + close(errCh) + for err := range errCh { + require.NoError(t, err) + } assert.LessOrEqual(t, peak.Load(), int32(cap), "peak concurrent in-flight goroutines must not exceed semaphore cap") diff --git a/providers/aws/recommendations/client.go b/providers/aws/recommendations/client.go index 40c566c2..31d4121f 100644 --- a/providers/aws/recommendations/client.go +++ b/providers/aws/recommendations/client.go @@ -71,7 +71,12 @@ func (c *Client) GetRecommendations(ctx context.Context, params common.Recommend AccountScope: types.AccountScopeLinked, } - // Implement rate limiting with exponential backoff + // Implement rate limiting with exponential backoff. The shared semaphore + // (if any) on ctx bounds aggregate concurrent Cost Explorer requests; we + // Acquire/Release around the SDK call itself rather than around the whole + // service sweep so a goroutine waiting on rate-limiter backoff or + // processing a response doesn't monopolise a permit while no request is + // in flight. See pkg/concurrency. var result *costexplorer.GetReservationPurchaseRecommendationOutput var err error @@ -81,7 +86,11 @@ func (c *Client) GetRecommendations(ctx context.Context, params common.Recommend return nil, fmt.Errorf("rate limiter wait failed: %w", waitErr) } + if acqErr := concurrency.Acquire(ctx); acqErr != nil { + return nil, fmt.Errorf("concurrency acquire failed: %w", acqErr) + } result, err = c.costExplorerClient.GetReservationPurchaseRecommendation(ctx, input) + concurrency.Release(ctx) if !c.rateLimiter.ShouldRetry(err) { break } @@ -193,54 +202,32 @@ func (c *Client) GetAllRecommendations(ctx context.Context) ([]common.Recommenda g, gctx := errgroup.WithContext(ctx) - // Each per-service goroutine is a leaf — it issues the actual Cost - // Explorer call. Acquire bounds aggregate concurrent IO across the - // whole recommendations-collection fan-out tree at the shared - // semaphore's cap (CUDLY_MAX_PARALLELISM, default 20); Release returns - // the slot. If no semaphore is on ctx (CLI tools, unit tests), - // Acquire/Release are no-ops. See pkg/concurrency. + // Per-service goroutines launch the inner (term, payment) sweep for each + // AWS service. They do NOT acquire the shared semaphore at this level — + // each service sweep makes 6 inner Cost Explorer calls (2 terms × 3 + // payment options) plus retries with rate-limiter backoff, so holding a + // permit across the whole sweep would tie up a slot during waits when + // no request is actually in flight. The Acquire/Release boundary lives + // inside GetRecommendations (around the individual CE SDK call), which + // frees slots during backoffs and gives the cap its full effective + // throughput. See pkg/concurrency. g.Go(func() error { - if err := concurrency.Acquire(gctx); err != nil { - ec2Err = err - return nil - } - defer concurrency.Release(gctx) ec2Recs, ec2Err = c.GetRecommendationsForService(gctx, common.ServiceEC2) return nil }) g.Go(func() error { - if err := concurrency.Acquire(gctx); err != nil { - rdsErr = err - return nil - } - defer concurrency.Release(gctx) rdsRecs, rdsErr = c.GetRecommendationsForService(gctx, common.ServiceRDS) return nil }) g.Go(func() error { - if err := concurrency.Acquire(gctx); err != nil { - cacheErr = err - return nil - } - defer concurrency.Release(gctx) cacheRecs, cacheErr = c.GetRecommendationsForService(gctx, common.ServiceElastiCache) return nil }) g.Go(func() error { - if err := concurrency.Acquire(gctx); err != nil { - osErr = err - return nil - } - defer concurrency.Release(gctx) osRecs, osErr = c.GetRecommendationsForService(gctx, common.ServiceOpenSearch) return nil }) g.Go(func() error { - if err := concurrency.Acquire(gctx); err != nil { - redshiftErr = err - return nil - } - defer concurrency.Release(gctx) redshiftRecs, redshiftErr = c.GetRecommendationsForService(gctx, common.ServiceRedshift) return nil }) diff --git a/providers/aws/recommendations/parser_sp.go b/providers/aws/recommendations/parser_sp.go index 7a8ce431..ac14916f 100644 --- a/providers/aws/recommendations/parser_sp.go +++ b/providers/aws/recommendations/parser_sp.go @@ -13,6 +13,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/costexplorer/types" "github.com/LeanerCloud/CUDly/pkg/common" + "github.com/LeanerCloud/CUDly/pkg/concurrency" ) // getSavingsPlansRecommendations fetches Savings Plans recommendations. @@ -44,6 +45,9 @@ func (c *Client) getSavingsPlansRecommendations(ctx context.Context, params comm AccountScope: types.AccountScopeLinked, } + // Acquire/Release the shared semaphore (if any on ctx) around each + // individual SDK call so rate-limiter backoff waits don't tie up a + // permit while no request is actually in flight. See pkg/concurrency. c.rateLimiter.Reset() var result *costexplorer.GetSavingsPlansPurchaseRecommendationOutput var err error @@ -53,7 +57,11 @@ func (c *Client) getSavingsPlansRecommendations(ctx context.Context, params comm return nil, fmt.Errorf("rate limiter wait failed: %w", waitErr) } + if acqErr := concurrency.Acquire(ctx); acqErr != nil { + return nil, fmt.Errorf("concurrency acquire failed: %w", acqErr) + } result, err = c.costExplorerClient.GetSavingsPlansPurchaseRecommendation(ctx, input) + concurrency.Release(ctx) if !c.rateLimiter.ShouldRetry(err) { break }