diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 9e6641cb..ad98e948 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -19,11 +19,13 @@ import ( "github.com/LeanerCloud/CUDly/internal/oidc" "github.com/LeanerCloud/CUDly/internal/purchase" "github.com/LeanerCloud/CUDly/pkg/common" + "github.com/LeanerCloud/CUDly/pkg/concurrency" "github.com/LeanerCloud/CUDly/pkg/logging" "github.com/LeanerCloud/CUDly/pkg/provider" azureprovider "github.com/LeanerCloud/CUDly/providers/azure" gcpprovider "github.com/LeanerCloud/CUDly/providers/gcp" "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" ) // SchedulerConfig holds configuration for the scheduler @@ -168,6 +170,22 @@ func (s *Scheduler) CollectRecommendations(ctx context.Context) (*CollectResult, return nil, err } + // Attach a shared semaphore to ctx so every leaf goroutine in the + // recommendations-collection fan-out tree (AWS service, Azure service, + // GCP region×service) acquires one slot before issuing its cloud-API + // call and releases it after. This bounds aggregate concurrent IO across + // the whole tree at CUDLY_MAX_PARALLELISM (default 20) regardless of how + // nested the dispatch is — without it, peak concurrency multiplies + // through the nested fan-outs and can exhaust Lambda memory before work + // completes (observed with a 512 MB function in dev). Intermediate + // dispatchers (provider, account, GCP region) do NOT acquire — they only + // launch sub-goroutines — so no goroutine can deadlock by holding a + // permit while waiting for sub-permits. + maxParallelism := concurrency.MaxParallelismFromEnv() + sem := semaphore.NewWeighted(int64(maxParallelism)) + ctx = concurrency.WithSharedSemaphore(ctx, sem) + logging.Infof("Recommendations collection: aggregate parallelism cap = %d", maxParallelism) + // Collect recommendations from each enabled provider concurrently, tracking // per-provider outcomes so persistence can scope stale-row eviction to // (provider, account) pairs that actually ran. A partial collection (e.g. diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index fc8ee994..6c055e30 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -1176,7 +1176,11 @@ func TestScheduler_ListRecommendations_ColdStartSync(t *testing.T) { // global config. Return no enabled providers so the collect is a // no-op but still runs the persistence path. mockStore.On("GetGlobalConfig", ctx).Return(&config.GlobalConfig{EnabledProviders: []string{}}, nil) - mockStore.On("UpsertRecommendations", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) + // UpsertRecommendations runs inside CollectRecommendations, after the + // shared-semaphore is attached to ctx; the wrapped ctx is what reaches + // the persistence layer. mock.Anything keeps the assertion resilient + // to that wrap. + mockStore.On("UpsertRecommendations", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) mockStore.On("ListStoredRecommendations", ctx, mock.Anything). Return([]config.RecommendationRecord{}, nil) @@ -1189,7 +1193,7 @@ func TestScheduler_ListRecommendations_ColdStartSync(t *testing.T) { // CollectRecommendations, so seeing it prove the sync collect fired // before the store read. mockStore.AssertCalled(t, "GetGlobalConfig", ctx) - mockStore.AssertCalled(t, "UpsertRecommendations", ctx, mock.Anything, mock.Anything, mock.Anything) + mockStore.AssertCalled(t, "UpsertRecommendations", mock.Anything, mock.Anything, mock.Anything, mock.Anything) } // fanOutPerAccount bounds parallel in-flight calls to @@ -1718,7 +1722,11 @@ func TestScheduler_CollectRecommendations_WithSuccessfulRecs(t *testing.T) { // contract test cover the cancellation path). mockProvider.On("GetRecommendationsClient", mock.Anything).Return(mockRecClient, nil) mockRecClient.On("GetAllRecommendations", mock.Anything).Return(recommendations, nil) - mockEmail.On("SendNewRecommendationsNotification", ctx, mock.AnythingOfType("email.NotificationData")).Return(nil) + // SendNewRecommendationsNotification fires inside CollectRecommendations, + // after ctx has been wrapped via concurrency.WithSharedSemaphore — the + // wrapped ctx is what reaches the email sender. mock.Anything keeps the + // assertion resilient to that wrap. + mockEmail.On("SendNewRecommendationsNotification", mock.Anything, mock.AnythingOfType("email.NotificationData")).Return(nil) scheduler := &Scheduler{ config: mockStore, @@ -1733,7 +1741,7 @@ func TestScheduler_CollectRecommendations_WithSuccessfulRecs(t *testing.T) { assert.Equal(t, 1, result.Recommendations) assert.Equal(t, 500.0, result.TotalSavings) - mockEmail.AssertCalled(t, "SendNewRecommendationsNotification", ctx, mock.AnythingOfType("email.NotificationData")) + mockEmail.AssertCalled(t, "SendNewRecommendationsNotification", mock.Anything, mock.AnythingOfType("email.NotificationData")) } // Test AWS recommendations fallback to GetRecommendations when GetAllRecommendations returns empty diff --git a/pkg/concurrency/concurrency.go b/pkg/concurrency/concurrency.go new file mode 100644 index 00000000..a279c5a9 --- /dev/null +++ b/pkg/concurrency/concurrency.go @@ -0,0 +1,87 @@ +// Package concurrency provides a shared global parallelism cap for the +// recommendations-collection fan-out tree. +// +// The fan-out has up to four nested levels (provider → account → service|region +// → per-region service). Each level was independently capped, so peak goroutine +// counts multiplied through the tree (3 providers × 20 accounts × 30 regions × +// 2 services = thousands of in-flight gRPC/HTTP clients). On a 512 MB Lambda +// that exhausted memory before the work could finish. +// +// A single semaphore stashed on the context lets every leaf goroutine — the +// goroutine that issues the actual cloud-API call — acquire one slot before +// doing IO and release it after, so the aggregate concurrent IO count is hard- +// bounded regardless of nesting depth. Intermediate dispatchers (provider, +// account, GCP region) do NOT acquire — they only launch sub-goroutines — so +// no goroutine can deadlock by holding a permit while waiting for sub-permits. +// +// If no semaphore is attached to the context (e.g. unit tests, ambient calls +// from CLI tools), Acquire and Release are no-ops; callers don't need to +// branch on whether the semaphore is set. +package concurrency + +import ( + "context" + "os" + "strconv" + + "golang.org/x/sync/semaphore" +) + +// DefaultMaxParallelism is the default cap on aggregate concurrent leaf +// goroutines across the recommendations-collection fan-out tree. Override at +// runtime with CUDLY_MAX_PARALLELISM. +const DefaultMaxParallelism = 20 + +// MaxParallelismFromEnv reads CUDLY_MAX_PARALLELISM and returns its +// positive-integer value, falling back to DefaultMaxParallelism on unset / +// invalid / non-positive values. +func MaxParallelismFromEnv() int { + if v := os.Getenv("CUDLY_MAX_PARALLELISM"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + return n + } + } + return DefaultMaxParallelism +} + +type ctxKey struct{} + +// WithSharedSemaphore returns a context carrying sem. Goroutines spawned from +// this context (or any descendant) can acquire/release slots via Acquire and +// Release. If sem is nil the context is returned unchanged. +func WithSharedSemaphore(ctx context.Context, sem *semaphore.Weighted) context.Context { + if sem == nil { + return ctx + } + return context.WithValue(ctx, ctxKey{}, sem) +} + +// SharedSemaphore returns the semaphore stashed in ctx, or nil if none. +func SharedSemaphore(ctx context.Context) *semaphore.Weighted { + sem, _ := ctx.Value(ctxKey{}).(*semaphore.Weighted) + return sem +} + +// Acquire blocks until a slot is available on the shared semaphore in ctx and +// returns nil. Returns ctx.Err() if the wait is cancelled. If no semaphore is +// attached to ctx, Acquire is a no-op and returns nil immediately — leaf +// callers can use it unconditionally without checking. +func Acquire(ctx context.Context) error { + sem := SharedSemaphore(ctx) + if sem == nil { + return nil + } + return sem.Acquire(ctx, 1) +} + +// Release returns one slot to the shared semaphore in ctx. Always pair with a +// successful Acquire (return value nil); calling Release after a cancelled +// Acquire would corrupt the slot count. If no semaphore is attached to ctx, +// Release is a no-op. +func Release(ctx context.Context) { + sem := SharedSemaphore(ctx) + if sem == nil { + return + } + sem.Release(1) +} diff --git a/pkg/concurrency/concurrency_test.go b/pkg/concurrency/concurrency_test.go new file mode 100644 index 00000000..76d08f64 --- /dev/null +++ b/pkg/concurrency/concurrency_test.go @@ -0,0 +1,136 @@ +package concurrency + +import ( + "context" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/semaphore" +) + +// TestMaxParallelismFromEnv pins the env-knob parser semantics for +// CUDLY_MAX_PARALLELISM. +func TestMaxParallelismFromEnv(t *testing.T) { + cases := []struct { + name string + env string + want int + }{ + {"unset returns default", "", DefaultMaxParallelism}, + {"positive integer overrides", "50", 50}, + {"non-numeric falls back to default", "many", DefaultMaxParallelism}, + {"zero falls back to default", "0", DefaultMaxParallelism}, + {"negative falls back to default", "-3", DefaultMaxParallelism}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Setenv("CUDLY_MAX_PARALLELISM", tc.env) + assert.Equal(t, tc.want, MaxParallelismFromEnv()) + }) + } + + t.Run("explicit unset returns default", func(t *testing.T) { + os.Unsetenv("CUDLY_MAX_PARALLELISM") + assert.Equal(t, DefaultMaxParallelism, MaxParallelismFromEnv()) + }) +} + +// TestSharedSemaphore_NoSemaphoreOnContext verifies Acquire/Release are +// no-ops when no semaphore is attached — the documented contract that lets +// CLI tools and unit tests skip the semaphore entirely without per-call +// branching. +func TestSharedSemaphore_NoSemaphoreOnContext(t *testing.T) { + ctx := context.Background() + assert.Nil(t, SharedSemaphore(ctx)) + require.NoError(t, Acquire(ctx)) + Release(ctx) // must not panic +} + +// TestSharedSemaphore_WithNilSemaphore verifies WithSharedSemaphore returns +// the input ctx unchanged when sem is nil — defensive against accidental +// nil passes. +func TestSharedSemaphore_WithNilSemaphore(t *testing.T) { + ctx := context.Background() + assert.Equal(t, ctx, WithSharedSemaphore(ctx, nil)) +} + +// TestSharedSemaphore_BoundsConcurrency is the load-bearing contract test: +// with a cap of 3, 20 goroutines all calling Acquire/work/Release must +// never see more than 3 in-flight concurrently. Asserts peak concurrency +// observed via atomics. +func TestSharedSemaphore_BoundsConcurrency(t *testing.T) { + const cap = 3 + const goroutines = 20 + sem := semaphore.NewWeighted(cap) + ctx := WithSharedSemaphore(context.Background(), sem) + + var inflight, peak atomic.Int32 + updatePeak := func(cur int32) { + for { + p := peak.Load() + if cur <= p || peak.CompareAndSwap(p, cur) { + return + } + } + } + + // Workers must never call require.* / FailNow on a non-test goroutine — + // testify's contract is that those land on the test's own goroutine + // (otherwise the failure mechanism uses runtime.Goexit on the worker + // instead of stopping the test, which can hang or skip cleanup). Each + // worker captures its Acquire result on a buffered channel and the main + // goroutine asserts after wg.Wait(). Release is only deferred on a + // successful Acquire — the documented pairing contract. + var wg sync.WaitGroup + errCh := make(chan error, goroutines) + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + if err := Acquire(ctx); err != nil { + errCh <- err + return + } + defer Release(ctx) + errCh <- nil + cur := inflight.Add(1) + updatePeak(cur) + time.Sleep(2 * time.Millisecond) // make overlap observable + inflight.Add(-1) + }() + } + wg.Wait() + close(errCh) + for err := range errCh { + require.NoError(t, err) + } + + assert.LessOrEqual(t, peak.Load(), int32(cap), + "peak concurrent in-flight goroutines must not exceed semaphore cap") + assert.GreaterOrEqual(t, peak.Load(), int32(2), + "with %d goroutines and cap %d, peak should reach at least 2 (proves goroutines genuinely overlapped)", + goroutines, cap) +} + +// TestSharedSemaphore_AcquireRespectsCancellation verifies Acquire returns +// ctx.Err() when the parent ctx is cancelled while waiting for a slot. +// Without this, a cancelled refresh would leak a goroutine parked +// indefinitely on Acquire. +func TestSharedSemaphore_AcquireRespectsCancellation(t *testing.T) { + sem := semaphore.NewWeighted(1) + // Pre-occupy the only slot so the second Acquire must wait. + require.NoError(t, sem.Acquire(context.Background(), 1)) + defer sem.Release(1) + + ctx, cancel := context.WithCancel(WithSharedSemaphore(context.Background(), sem)) + cancel() // cancel before Acquire even starts + + err := Acquire(ctx) + require.Error(t, err) + assert.ErrorIs(t, err, context.Canceled) +} diff --git a/pkg/go.mod b/pkg/go.mod index 88badf28..3c4f40eb 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -1,8 +1,6 @@ module github.com/LeanerCloud/CUDly/pkg -go 1.23 - -toolchain go1.24.4 +go 1.25.0 // This module contains shared types, provider interfaces, and the exchange package. // The exchange package has AWS SDK dependencies for RI exchange operations. @@ -12,7 +10,10 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.26.2 github.com/aws/aws-sdk-go-v2/service/ec2 v1.251.2 github.com/aws/aws-sdk-go-v2/service/sts v1.26.6 + github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.11.1 + golang.org/x/sync v0.20.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -27,8 +28,5 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.5 // indirect github.com/aws/smithy-go v1.24.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/pkg/go.sum b/pkg/go.sum index ceedd991..a03f6a92 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -30,14 +30,14 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/providers/aws/recommendations/client.go b/providers/aws/recommendations/client.go index 5f454ee5..31d4121f 100644 --- a/providers/aws/recommendations/client.go +++ b/providers/aws/recommendations/client.go @@ -11,6 +11,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/LeanerCloud/CUDly/pkg/common" + "github.com/LeanerCloud/CUDly/pkg/concurrency" "github.com/LeanerCloud/CUDly/pkg/logging" ) @@ -70,7 +71,12 @@ func (c *Client) GetRecommendations(ctx context.Context, params common.Recommend AccountScope: types.AccountScopeLinked, } - // Implement rate limiting with exponential backoff + // Implement rate limiting with exponential backoff. The shared semaphore + // (if any) on ctx bounds aggregate concurrent Cost Explorer requests; we + // Acquire/Release around the SDK call itself rather than around the whole + // service sweep so a goroutine waiting on rate-limiter backoff or + // processing a response doesn't monopolise a permit while no request is + // in flight. See pkg/concurrency. var result *costexplorer.GetReservationPurchaseRecommendationOutput var err error @@ -80,7 +86,11 @@ func (c *Client) GetRecommendations(ctx context.Context, params common.Recommend return nil, fmt.Errorf("rate limiter wait failed: %w", waitErr) } + if acqErr := concurrency.Acquire(ctx); acqErr != nil { + return nil, fmt.Errorf("concurrency acquire failed: %w", acqErr) + } result, err = c.costExplorerClient.GetReservationPurchaseRecommendation(ctx, input) + concurrency.Release(ctx) if !c.rateLimiter.ShouldRetry(err) { break } @@ -192,6 +202,15 @@ func (c *Client) GetAllRecommendations(ctx context.Context) ([]common.Recommenda g, gctx := errgroup.WithContext(ctx) + // Per-service goroutines launch the inner (term, payment) sweep for each + // AWS service. They do NOT acquire the shared semaphore at this level — + // each service sweep makes 6 inner Cost Explorer calls (2 terms × 3 + // payment options) plus retries with rate-limiter backoff, so holding a + // permit across the whole sweep would tie up a slot during waits when + // no request is actually in flight. The Acquire/Release boundary lives + // inside GetRecommendations (around the individual CE SDK call), which + // frees slots during backoffs and gives the cap its full effective + // throughput. See pkg/concurrency. g.Go(func() error { ec2Recs, ec2Err = c.GetRecommendationsForService(gctx, common.ServiceEC2) return nil diff --git a/providers/aws/recommendations/parser_sp.go b/providers/aws/recommendations/parser_sp.go index 7a8ce431..ac14916f 100644 --- a/providers/aws/recommendations/parser_sp.go +++ b/providers/aws/recommendations/parser_sp.go @@ -13,6 +13,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/costexplorer/types" "github.com/LeanerCloud/CUDly/pkg/common" + "github.com/LeanerCloud/CUDly/pkg/concurrency" ) // getSavingsPlansRecommendations fetches Savings Plans recommendations. @@ -44,6 +45,9 @@ func (c *Client) getSavingsPlansRecommendations(ctx context.Context, params comm AccountScope: types.AccountScopeLinked, } + // Acquire/Release the shared semaphore (if any on ctx) around each + // individual SDK call so rate-limiter backoff waits don't tie up a + // permit while no request is actually in flight. See pkg/concurrency. c.rateLimiter.Reset() var result *costexplorer.GetSavingsPlansPurchaseRecommendationOutput var err error @@ -53,7 +57,11 @@ func (c *Client) getSavingsPlansRecommendations(ctx context.Context, params comm return nil, fmt.Errorf("rate limiter wait failed: %w", waitErr) } + if acqErr := concurrency.Acquire(ctx); acqErr != nil { + return nil, fmt.Errorf("concurrency acquire failed: %w", acqErr) + } result, err = c.costExplorerClient.GetSavingsPlansPurchaseRecommendation(ctx, input) + concurrency.Release(ctx) if !c.rateLimiter.ShouldRetry(err) { break } diff --git a/providers/azure/recommendations.go b/providers/azure/recommendations.go index ffb773d9..ea0c007a 100644 --- a/providers/azure/recommendations.go +++ b/providers/azure/recommendations.go @@ -12,6 +12,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/LeanerCloud/CUDly/pkg/common" + "github.com/LeanerCloud/CUDly/pkg/concurrency" "github.com/LeanerCloud/CUDly/pkg/logging" "github.com/LeanerCloud/CUDly/providers/azure/services/cache" "github.com/LeanerCloud/CUDly/providers/azure/services/compute" @@ -70,48 +71,67 @@ func (r *RecommendationsClientAdapter) GetRecommendations(ctx context.Context, p g, gctx := errgroup.WithContext(ctx) + // Each per-service goroutine is a leaf — it issues the actual ARM / + // pricing-API call. The shared semaphore on gctx (set by the scheduler) + // bounds aggregate concurrent IO across the whole recommendations- + // collection fan-out tree at CUDLY_MAX_PARALLELISM (default 20). If no + // semaphore is attached (CLI tools, unit tests), Acquire/Release are + // no-ops. See pkg/concurrency. + // + // goService extracts the Acquire/Release boilerplate so each per- + // service block stays a single g.Go call — keeps GetRecommendations + // under the project's gocyclo gate (the per-service `if Acquire {…}` + // branches counted toward this function's cyclomatic complexity + // before extraction). + goService := func(errOut *error, fn func()) { + g.Go(func() error { + if err := concurrency.Acquire(gctx); err != nil { + *errOut = err + return nil + } + defer concurrency.Release(gctx) + fn() + return nil // error isolation: never propagate to errgroup + }) + } + // Compute (VM) recommendations — subscription-wide. if shouldIncludeService(params, common.ServiceCompute) { - g.Go(func() error { + goService(&computeErr, func() { computeClient := compute.NewClient(r.cred, r.subscriptionID, "") computeRecs, computeErr = computeClient.GetRecommendations(gctx, params) - return nil // error isolation: never propagate to errgroup }) } // Database (SQL) recommendations — subscription-wide. if shouldIncludeService(params, common.ServiceRelationalDB) { - g.Go(func() error { + goService(&dbErr, func() { dbClient := database.NewClient(r.cred, r.subscriptionID, "") dbRecs, dbErr = dbClient.GetRecommendations(gctx, params) - return nil }) } // Cache (Redis) recommendations — subscription-wide. if shouldIncludeService(params, common.ServiceCache) { - g.Go(func() error { + goService(&cacheErr, func() { cacheClient := cache.NewClient(r.cred, r.subscriptionID, "") cacheRecs, cacheErr = cacheClient.GetRecommendations(gctx, params) - return nil }) } // CosmosDB (NoSQL) recommendations — subscription-wide. if shouldIncludeService(params, common.ServiceNoSQL) { - g.Go(func() error { + goService(&cosmosErr, func() { cosmosClient := cosmosdb.NewClient(r.cred, r.subscriptionID, "") cosmosRecs, cosmosErr = cosmosClient.GetRecommendations(gctx, params) - return nil }) } // Azure Advisor adds cross-cutting cost recommendations independent of the // per-service Reservation API. Failures here are non-fatal — the per-service // results above are still useful on their own. - g.Go(func() error { + goService(&advisorErr, func() { advisorRecs, advisorErr = r.getAdvisorRecommendations(gctx, params) - return nil }) // Wait for all goroutines. g.Wait() always returns nil because every diff --git a/providers/gcp/go.mod b/providers/gcp/go.mod index 1972df7d..ea76ed0f 100644 --- a/providers/gcp/go.mod +++ b/providers/gcp/go.mod @@ -1,8 +1,6 @@ module github.com/LeanerCloud/CUDly/providers/gcp -go 1.23 - -toolchain go1.24.4 +go 1.25.0 require ( cloud.google.com/go/compute v1.23.3 @@ -14,7 +12,7 @@ require ( github.com/googleapis/gax-go/v2 v2.12.0 github.com/stretchr/testify v1.11.1 golang.org/x/oauth2 v0.16.0 - golang.org/x/sync v0.6.0 + golang.org/x/sync v0.20.0 google.golang.org/api v0.160.0 google.golang.org/genproto v0.0.0-20240116215550-a9fa1716bcac google.golang.org/grpc v1.61.0 diff --git a/providers/gcp/go.sum b/providers/gcp/go.sum index c1ad5f99..61f0858c 100644 --- a/providers/gcp/go.sum +++ b/providers/gcp/go.sum @@ -135,8 +135,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/providers/gcp/recommendations.go b/providers/gcp/recommendations.go index b7b8debd..b00af976 100644 --- a/providers/gcp/recommendations.go +++ b/providers/gcp/recommendations.go @@ -13,6 +13,7 @@ import ( "google.golang.org/api/option" "github.com/LeanerCloud/CUDly/pkg/common" + "github.com/LeanerCloud/CUDly/pkg/concurrency" "github.com/LeanerCloud/CUDly/pkg/logging" "github.com/LeanerCloud/CUDly/providers/gcp/services/cloudsql" "github.com/LeanerCloud/CUDly/providers/gcp/services/computeengine" @@ -154,8 +155,22 @@ func (r *RecommendationsClientAdapter) collectRegion(ctx context.Context, params g, gctx := errgroup.WithContext(ctx) + // Per-(region, service) goroutines are leaves — they issue the actual + // Recommender API call. Acquire bounds aggregate concurrent IO across + // the whole recommendations-collection fan-out tree at the shared + // semaphore's cap (CUDLY_MAX_PARALLELISM, default 20); Release returns + // the slot. Without this bound the per-region fan-out (cap 10) × + // per-service sub-fan-out (2) × accounts × providers can produce + // hundreds of concurrent gRPC clients that exhaust Lambda memory. If + // no semaphore is on ctx (CLI tools, unit tests), Acquire/Release are + // no-ops. See pkg/concurrency. if shouldIncludeService(params, common.ServiceCompute) { g.Go(func() error { + if err := concurrency.Acquire(gctx); err != nil { + computeErr = err + return nil + } + defer concurrency.Release(gctx) client, err := computeengine.NewClient(gctx, r.projectID, region, r.clientOpts...) if err != nil { computeErr = err @@ -167,6 +182,11 @@ func (r *RecommendationsClientAdapter) collectRegion(ctx context.Context, params } if shouldIncludeService(params, common.ServiceRelationalDB) { g.Go(func() error { + if err := concurrency.Acquire(gctx); err != nil { + sqlErr = err + return nil + } + defer concurrency.Release(gctx) client, err := cloudsql.NewClient(gctx, r.projectID, region, r.clientOpts...) if err != nil { sqlErr = err diff --git a/terraform/environments/aws/github-dev.tfvars b/terraform/environments/aws/github-dev.tfvars index b6c06449..abcfff29 100644 --- a/terraform/environments/aws/github-dev.tfvars +++ b/terraform/environments/aws/github-dev.tfvars @@ -18,7 +18,7 @@ compute_platform = "lambda" enable_docker_build = true # Build and push image via terraform apply on the runner # Lambda Configuration -lambda_memory_size = 512 +lambda_memory_size = 2048 lambda_timeout = 60 lambda_reserved_concurrency = -1 lambda_log_retention_days = 7