From 00b8fd870d62cc5b32f3dbdec78461b365410019 Mon Sep 17 00:00:00 2001 From: Cristian Magherusan-Stanciu Date: Mon, 4 May 2026 21:30:46 +0200 Subject: [PATCH 1/3] perf(aws): parallelize 5 sequential service calls in GetAllRecommendations (closes #266) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The dispatcher in providers/aws/recommendations/client.go::GetAllRecommendations ran 5 service calls (EC2 / RDS / ElastiCache / OpenSearch / Redshift) one after another with an artificial 100ms stagger between each. Each per- service call hits AWS Cost Explorer's GetReservationPurchaseRecommendation, which routinely takes multiple seconds. Total wall time per account scaled as sum(per-service) instead of max(per-service). Mirrors the Azure parallelisation in providers/azure/recommendations.go (closes #258, commit b10326c5). The five calls have no inter-service dependency, so they can run concurrently under errgroup.WithContext. Each goroutine captures its own error in a closure-scoped variable and returns nil to the group so one service's failure does not cancel siblings (matching the previous loop's `continue`-on-error tolerance). Results are merged in the canonical order EC2 → RDS → ElastiCache → OpenSearch → Redshift after Wait() so any order-sensitive callers stay stable. After Wait(), ctx.Err() is checked and propagated so a canceled parent ctx surfaces as context.Canceled / context.DeadlineExceeded rather than being swallowed by the per-service error-isolation goroutines (which all return nil). The 100ms artificial stagger is removed — it was a sequential-mode rate- limit hack that adds 400ms per account for no benefit under concurrent fan-out (Cost Explorer rate limits apply per request, not per consecutive request). Behaviour change vs the previous sequential loop: per-service errors are now logged at WARN via the new mergeServiceResults helper. The previous loop swallowed them silently with a bare `continue`, leaving operators no signal when a single service was misbehaving. The serviceResult struct + mergeServiceResults helper extraction also keeps GetAllRecommendations under the project's gocyclo gate (.golangci.yml min-complexity: 15). New test TestGetAllRecommendations_PropagatesContextCancellation pins the contract: calling GetAllRecommendations with a pre-cancelled context must return context.Canceled. Mirrors the Azure equivalent at providers/azure/recommendations_test.go::TestRecommendationsClientAdapter_GetRecommendations_PropagatesContextCancellation. Expected end-to-end after this change: ~max(individual call durations) instead of sum + 4×100ms stagger. --- providers/aws/go.mod | 6 +- providers/aws/go.sum | 4 +- providers/aws/recommendations/client.go | 113 ++++++++++++++----- providers/aws/recommendations/client_test.go | 36 ++++++ 4 files changed, 125 insertions(+), 34 deletions(-) diff --git a/providers/aws/go.mod b/providers/aws/go.mod index 448cdf15..f260749e 100644 --- a/providers/aws/go.mod +++ b/providers/aws/go.mod @@ -1,8 +1,6 @@ module github.com/LeanerCloud/CUDly/providers/aws -go 1.23 - -toolchain go1.24.4 +go 1.25.0 require ( github.com/LeanerCloud/CUDly/pkg v0.0.0 @@ -20,6 +18,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.26.6 github.com/aws/smithy-go v1.24.0 github.com/stretchr/testify v1.11.1 + golang.org/x/sync v0.20.0 ) require ( @@ -33,7 +32,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.18.5 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.5 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/providers/aws/go.sum b/providers/aws/go.sum index cb0a64bf..7bc9ff6f 100644 --- a/providers/aws/go.sum +++ b/providers/aws/go.sum @@ -46,14 +46,14 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/providers/aws/recommendations/client.go b/providers/aws/recommendations/client.go index 0d808fcb..5f454ee5 100644 --- a/providers/aws/recommendations/client.go +++ b/providers/aws/recommendations/client.go @@ -4,13 +4,14 @@ package recommendations import ( "context" "fmt" - "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/costexplorer" "github.com/aws/aws-sdk-go-v2/service/costexplorer/types" + "golang.org/x/sync/errgroup" "github.com/LeanerCloud/CUDly/pkg/common" + "github.com/LeanerCloud/CUDly/pkg/logging" ) // CostExplorerAPI defines the interface for Cost Explorer operations @@ -169,38 +170,94 @@ func (c *Client) GetRecommendationsForService(ctx context.Context, service commo return allRecs, nil } -// GetAllRecommendations fetches recommendations for all supported services +// GetAllRecommendations fetches recommendations for all supported services. +// +// All five service calls run concurrently under errgroup. Each goroutine +// captures its own error in a closure-scoped variable and returns nil to the +// group so a single per-service failure does not cancel its siblings (matching +// the previous loop's `continue`-on-error tolerance). Results are merged in +// the canonical order EC2 → RDS → ElastiCache → OpenSearch → Redshift after +// all goroutines finish so order-sensitive consumers stay stable. +// +// Behaviour change vs the previous sequential loop: per-service errors are +// now logged at WARN via mergeServiceResults — the previous loop swallowed +// them silently with a bare `continue`, leaving operators no signal when a +// single service was misbehaving. Mirrors the Azure parallelisation in +// providers/azure/recommendations.go (closes #258, commit b10326c5). func (c *Client) GetAllRecommendations(ctx context.Context) ([]common.Recommendation, error) { - services := []common.ServiceType{ - common.ServiceEC2, - common.ServiceRDS, - common.ServiceElastiCache, - common.ServiceOpenSearch, - common.ServiceRedshift, + var ( + ec2Recs, rdsRecs, cacheRecs, osRecs, redshiftRecs []common.Recommendation + ec2Err, rdsErr, cacheErr, osErr, redshiftErr error + ) + + g, gctx := errgroup.WithContext(ctx) + + g.Go(func() error { + ec2Recs, ec2Err = c.GetRecommendationsForService(gctx, common.ServiceEC2) + return nil + }) + g.Go(func() error { + rdsRecs, rdsErr = c.GetRecommendationsForService(gctx, common.ServiceRDS) + return nil + }) + g.Go(func() error { + cacheRecs, cacheErr = c.GetRecommendationsForService(gctx, common.ServiceElastiCache) + return nil + }) + g.Go(func() error { + osRecs, osErr = c.GetRecommendationsForService(gctx, common.ServiceOpenSearch) + return nil + }) + g.Go(func() error { + redshiftRecs, redshiftErr = c.GetRecommendationsForService(gctx, common.ServiceRedshift) + return nil + }) + + // Wait for all goroutines. g.Wait() always returns nil because every + // goroutine returns nil — errors are captured per-service above. After + // Wait, propagate ctx cancellation so callers can distinguish "all five + // services completed (with possibly per-service errors)" from "the + // parent ctx was canceled mid-fan-out". + _ = g.Wait() + if err := ctx.Err(); err != nil { + return nil, err } - allRecommendations := make([]common.Recommendation, 0) + return mergeServiceResults( + serviceResult{name: "EC2", recs: ec2Recs, err: ec2Err}, + serviceResult{name: "RDS", recs: rdsRecs, err: rdsErr}, + serviceResult{name: "ElastiCache", recs: cacheRecs, err: cacheErr}, + serviceResult{name: "OpenSearch", recs: osRecs, err: osErr}, + serviceResult{name: "Redshift", recs: redshiftRecs, err: redshiftErr}, + ), nil +} - for _, service := range services { - recs, err := c.GetRecommendationsForService(ctx, service) - if err != nil { - // Same rationale as in GetRecommendationsForService: a - // canceled / deadline-exceeded ctx is not a recoverable - // per-service error — short-circuit instead of marching - // through the remaining services and silently swallowing - // the cancellation. - if ctx.Err() != nil { - return nil, ctx.Err() - } +// serviceResult bundles a per-service collection outcome for the deterministic +// merge in mergeServiceResults. Extracted into a helper so GetAllRecommendations +// stays under the gocyclo gate (.golangci.yml min-complexity: 15) after the +// post-Wait ctx.Err() block was added. +type serviceResult struct { + name string + recs []common.Recommendation + err error +} + +// mergeServiceResults logs per-service errors at WARN and appends successful +// results in the order the slice is passed — callers must preserve the +// canonical EC2 → RDS → ElastiCache → OpenSearch → Redshift order so that +// order-sensitive consumers stay stable. +func mergeServiceResults(results ...serviceResult) []common.Recommendation { + total := 0 + for _, r := range results { + total += len(r.recs) + } + out := make([]common.Recommendation, 0, total) + for _, r := range results { + if r.err != nil { + logging.Warnf("AWS %s recommendations: %v", r.name, r.err) continue } - allRecommendations = append(allRecommendations, recs...) - select { - case <-time.After(100 * time.Millisecond): - case <-ctx.Done(): - return nil, ctx.Err() - } + out = append(out, r.recs...) } - - return allRecommendations, nil + return out } diff --git a/providers/aws/recommendations/client_test.go b/providers/aws/recommendations/client_test.go index 9a60b56c..c99c3e3b 100644 --- a/providers/aws/recommendations/client_test.go +++ b/providers/aws/recommendations/client_test.go @@ -519,3 +519,39 @@ func TestGetRecommendations_ContextCancellation(t *testing.T) { assert.Nil(t, recs) assert.Contains(t, err.Error(), "rate limiter wait failed") } + +// TestGetAllRecommendations_PropagatesContextCancellation pins the contract +// that GetAllRecommendations propagates ctx.Err() to its caller after the +// errgroup Wait() — the parent context being cancelled or its deadline +// exceeding must surface as an error rather than being swallowed by the +// per-service error-isolation goroutines (which all return nil to the +// errgroup so a single per-service failure does not cancel siblings). +// +// Without the explicit `if err := ctx.Err(); err != nil { return nil, err }` +// after `g.Wait()`, callers that wrap GetAllRecommendations with a deadline +// could see "all services finished cleanly" even when the deadline expired +// mid-fan-out (because every goroutine returned nil from its closure). +// +// Mirrors providers/azure/recommendations_test.go's +// TestRecommendationsClientAdapter_GetRecommendations_PropagatesContextCancellation. +func TestGetAllRecommendations_PropagatesContextCancellation(t *testing.T) { + mockAPI := &mockCostExplorerAPI{ + riError: newThrottleError(), + } + client := NewClientWithAPI(mockAPI, "us-east-1") + + // Cancel the context BEFORE the call so we don't depend on race-y + // timing inside the SDK clients. The Cost Explorer calls inside the + // goroutines observe the cancelled gctx (derived from ctx via + // errgroup.WithContext) and either short-circuit or return cancelled + // errors; either way, our post-Wait ctx.Err() check returns + // context.Canceled. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + recs, err := client.GetAllRecommendations(ctx) + require.Error(t, err, "expected context.Canceled to propagate from GetAllRecommendations") + assert.ErrorIs(t, err, context.Canceled, + "GetAllRecommendations must propagate the parent ctx error after g.Wait()") + assert.Nil(t, recs) +} From 65a781e10836242d1d7e47dc5608dc0e4dd31720 Mon Sep 17 00:00:00 2001 From: Cristian Magherusan-Stanciu Date: Mon, 4 May 2026 21:37:19 +0200 Subject: [PATCH 2/3] =?UTF-8?q?perf(gcp):=20parallelize=20region=20=C3=97?= =?UTF-8?q?=20service=20nested=20loops=20in=20GetRecommendations=20(closes?= =?UTF-8?q?=20#267)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The dispatcher in providers/gcp/recommendations.go::GetRecommendations was doubly serial — `for region { for service { ... } }` — fetching Compute Engine then Cloud SQL recommendations one region at a time. With ~30+ GCP regions × 2 services, that's 60+ sequential RPCs per account against the project-scoped Recommender API. Even at 200ms per call this is a 12s+ floor; in practice most calls are slower. After #260 (async self- invoke) this no longer triggers user-facing 502s, but it directly inflates the scheduler Lambda's runtime and the in-flight freshness- banner window. Two-level concurrent fan-out, mirroring the Azure pattern in providers/azure/recommendations.go (closes #258, commit b10326c5) and the AWS service-loop parallelisation (closes #266): - Outer: errgroup over regions, capped at gcpRegionConcurrency() (CUDLY_GCP_REGION_PARALLELISM, default 10) to stay polite to the project-scoped Recommender API quota. Lower than the existing account-level CUDLY_MAX_ACCOUNT_PARALLELISM (default 20) because each account already gets its own goroutine slot from fanOutPerAccount; cumulative concurrency would multiply otherwise. - Inner: within each region's goroutine, Compute + Cloud SQL run as two further goroutines under a per-region sub-errgroup, so per- region cost is max(compute, sql) rather than compute + sql. Each goroutine returns nil to its errgroup so a single per-(region, service) failure does not cancel siblings — preserves the previous silent-skip-on-err shape semantically. Behaviour change vs the previous nested for-loops: per-(region, service) errors that were silently swallowed by the previous `if err == nil { ... }` shape are now logged at WARN with the region+service tag so misconfigured projects are diagnosable. Errors still don't propagate to callers (preserving silent-skip), only the log severity changes from "invisible" to WARN. After the outer Wait(), ctx.Err() is checked and propagated so a canceled parent ctx surfaces as context.Canceled / context.Deadline- Exceeded rather than being swallowed by the per-region error-isolation goroutines (which all return nil). Result merging walks regions in sorted order and appends compute then sql per region — output is deterministic independent of GCP API region-list ordering or goroutine completion order. The collectRegion helper extraction also keeps GetRecommendations under the project's gocyclo gate (.golangci.yml min-complexity: 15) after the post-Wait ctx.Err() block was added. Tests added: - TestRecommendationsClientAdapter_GetRecommendations_PropagatesContextCancellation pins the contract that a pre-cancelled ctx surfaces as context.Canceled. - TestGCPRegionConcurrency pins the env-knob parser semantics (unset/positive/non-numeric/zero/negative/explicit-unset). Expected end-to-end after this change: ceil(N_regions / cap) × max(compute, sql) instead of N_regions × (compute + sql) — for a project with 30 regions and a cap of 10, that's a ~6× speedup before counting the per-region service- level parallelism win. --- providers/gcp/go.mod | 2 +- providers/gcp/recommendations.go | 179 ++++++++++++++++++++++---- providers/gcp/recommendations_test.go | 71 ++++++++++ 3 files changed, 227 insertions(+), 25 deletions(-) diff --git a/providers/gcp/go.mod b/providers/gcp/go.mod index 5b85bcc9..1972df7d 100644 --- a/providers/gcp/go.mod +++ b/providers/gcp/go.mod @@ -14,6 +14,7 @@ require ( github.com/googleapis/gax-go/v2 v2.12.0 github.com/stretchr/testify v1.11.1 golang.org/x/oauth2 v0.16.0 + golang.org/x/sync v0.6.0 google.golang.org/api v0.160.0 google.golang.org/genproto v0.0.0-20240116215550-a9fa1716bcac google.golang.org/grpc v1.61.0 @@ -44,7 +45,6 @@ require ( go.opentelemetry.io/otel/trace v1.22.0 // indirect golang.org/x/crypto v0.18.0 // indirect golang.org/x/net v0.20.0 // indirect - golang.org/x/sync v0.6.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect diff --git a/providers/gcp/recommendations.go b/providers/gcp/recommendations.go index 51f5206e..b7b8debd 100644 --- a/providers/gcp/recommendations.go +++ b/providers/gcp/recommendations.go @@ -4,14 +4,50 @@ package gcp import ( "context" "fmt" + "os" + "sort" + "strconv" + "sync" + "golang.org/x/sync/errgroup" "google.golang.org/api/option" "github.com/LeanerCloud/CUDly/pkg/common" + "github.com/LeanerCloud/CUDly/pkg/logging" "github.com/LeanerCloud/CUDly/providers/gcp/services/cloudsql" "github.com/LeanerCloud/CUDly/providers/gcp/services/computeengine" ) +// defaultGCPRegionConcurrency caps the parallel per-region goroutines inside +// a single GetRecommendations call. The GCP Recommender API is project-scoped +// and per-region calls share the project's quota, so the cap is intentionally +// modest. Override at runtime via CUDLY_GCP_REGION_PARALLELISM. +const defaultGCPRegionConcurrency = 10 + +// gcpRegionConcurrency reads the CUDLY_GCP_REGION_PARALLELISM env var and +// returns its positive-integer value, falling back to +// defaultGCPRegionConcurrency on unset / invalid / non-positive values. The +// helper is local to the gcp package because the providers/gcp module is a +// separate Go module from internal/execution and cannot import its +// ConcurrencyFromEnv counterpart directly. +func gcpRegionConcurrency() int { + if v := os.Getenv("CUDLY_GCP_REGION_PARALLELISM"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + return n + } + } + return defaultGCPRegionConcurrency +} + +// regionResult bundles the Compute Engine and Cloud SQL recommendation slices +// returned for a single GCP region. The merge in GetRecommendations walks +// regions in sorted order and appends compute then sql per region so output +// is deterministic independent of goroutine completion order. +type regionResult struct { + compute []common.Recommendation + sql []common.Recommendation +} + // RecommendationsClientAdapter aggregates GCP CUD and commitment recommendations across all services type RecommendationsClientAdapter struct { ctx context.Context @@ -19,44 +55,139 @@ type RecommendationsClientAdapter struct { clientOpts []option.ClientOption } -// GetRecommendations retrieves all GCP commitment recommendations across all services and regions +// GetRecommendations retrieves all GCP commitment recommendations across all +// services and regions. +// +// Two-level concurrent fan-out: +// - Outer: errgroup over regions, capped at gcpRegionConcurrency() +// (CUDLY_GCP_REGION_PARALLELISM, default 10) to stay polite to the +// project-scoped Recommender API quota. +// - Inner: within each region's goroutine, the (compute, cloud-sql) calls +// run as two further goroutines under a per-region sub-errgroup, so the +// per-region cost is max(compute, sql) rather than compute + sql. +// +// Behaviour change vs the previous nested for-loops: per-(region, service) +// errors that were previously silently swallowed (`if err == nil { ... }` +// shape) are now logged at WARN with region+service identifiers so +// misconfigured projects are diagnosable. Errors do NOT cancel siblings — +// each goroutine returns nil to its errgroup, matching the previous +// silent-skip-on-err semantics. +// +// After the outer Wait(), ctx.Err() is checked and propagated so a canceled +// parent ctx surfaces as context.Canceled / context.DeadlineExceeded rather +// than being swallowed by the per-region error-isolation goroutines. +// +// Mirrors the Azure parallelisation in +// providers/azure/recommendations.go (closes #258, commit b10326c5) and the +// AWS service-loop parallelisation (closes #266). func (r *RecommendationsClientAdapter) GetRecommendations(ctx context.Context, params common.RecommendationParams) ([]common.Recommendation, error) { - allRecommendations := make([]common.Recommendation, 0) - // Get list of regions to check regions, err := r.getRegions(ctx) if err != nil { return nil, fmt.Errorf("failed to get regions: %w", err) } - // Collect recommendations from each service type across all regions + // Collect per-region results into a map keyed by region name. The merge + // step walks regions in sorted order so the output is deterministic + // independent of goroutine completion order — keeps snapshot tests + // stable. + var ( + mu sync.Mutex + results = make(map[string]regionResult, len(regions)) + ) + + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(gcpRegionConcurrency()) + for _, region := range regions { - // Compute Engine CUD recommendations - if shouldIncludeService(params, common.ServiceCompute) { - computeClient, err := computeengine.NewClient(ctx, r.projectID, region, r.clientOpts...) - if err == nil { - computeRecs, err := computeClient.GetRecommendations(ctx, params) - if err == nil { - allRecommendations = append(allRecommendations, computeRecs...) - } - } - } + region := region // capture per-iteration + g.Go(func() error { + res := r.collectRegion(gctx, params, region) + mu.Lock() + results[region] = res + mu.Unlock() + return nil // error isolation: per-region failures don't cancel siblings + }) + } - // Cloud SQL commitment recommendations - if shouldIncludeService(params, common.ServiceRelationalDB) { - sqlClient, err := cloudsql.NewClient(ctx, r.projectID, region, r.clientOpts...) - if err == nil { - sqlRecs, err := sqlClient.GetRecommendations(ctx, params) - if err == nil { - allRecommendations = append(allRecommendations, sqlRecs...) - } - } - } + // Wait for all region goroutines. g.Wait() always returns nil because + // every goroutine returns nil — errors are logged inside collectRegion. + // After Wait, propagate ctx cancellation so callers can distinguish + // "all regions completed (with possibly per-region errors)" from "the + // parent ctx was canceled mid-fan-out". + _ = g.Wait() + if err := ctx.Err(); err != nil { + return nil, err + } + + // Deterministic merge: walk regions in sorted order, append compute then + // sql per region. Output is stable regardless of GCP API region-list + // ordering or goroutine completion order. + sortedRegions := make([]string, 0, len(results)) + for region := range results { + sortedRegions = append(sortedRegions, region) } + sort.Strings(sortedRegions) + allRecommendations := make([]common.Recommendation, 0) + for _, region := range sortedRegions { + res := results[region] + allRecommendations = append(allRecommendations, res.compute...) + allRecommendations = append(allRecommendations, res.sql...) + } return allRecommendations, nil } +// collectRegion fetches Compute Engine and Cloud SQL recommendations for a +// single region concurrently. Per-service errors are logged at WARN with the +// region+service tag and never propagate — the previous silent-skip-on-err +// shape is preserved (so a misconfigured project doesn't error out the whole +// recommendations refresh) but errors are now observable in logs. Extracted +// from GetRecommendations to keep that function under the gocyclo gate +// (.golangci.yml min-complexity: 15) after the post-Wait ctx.Err() block was +// added. +func (r *RecommendationsClientAdapter) collectRegion(ctx context.Context, params common.RecommendationParams, region string) regionResult { + var ( + computeRecs, sqlRecs []common.Recommendation + computeErr, sqlErr error + ) + + g, gctx := errgroup.WithContext(ctx) + + if shouldIncludeService(params, common.ServiceCompute) { + g.Go(func() error { + client, err := computeengine.NewClient(gctx, r.projectID, region, r.clientOpts...) + if err != nil { + computeErr = err + return nil + } + computeRecs, computeErr = client.GetRecommendations(gctx, params) + return nil + }) + } + if shouldIncludeService(params, common.ServiceRelationalDB) { + g.Go(func() error { + client, err := cloudsql.NewClient(gctx, r.projectID, region, r.clientOpts...) + if err != nil { + sqlErr = err + return nil + } + sqlRecs, sqlErr = client.GetRecommendations(gctx, params) + return nil + }) + } + _ = g.Wait() + + if computeErr != nil { + logging.Warnf("GCP %s compute recommendations: %v", region, computeErr) + } + if sqlErr != nil { + logging.Warnf("GCP %s cloudsql recommendations: %v", region, sqlErr) + } + + return regionResult{compute: computeRecs, sql: sqlRecs} +} + // GetRecommendationsForService retrieves GCP commitment recommendations for a specific service func (r *RecommendationsClientAdapter) GetRecommendationsForService(ctx context.Context, service common.ServiceType) ([]common.Recommendation, error) { params := common.RecommendationParams{ diff --git a/providers/gcp/recommendations_test.go b/providers/gcp/recommendations_test.go index 5e1d9a2f..c00435d3 100644 --- a/providers/gcp/recommendations_test.go +++ b/providers/gcp/recommendations_test.go @@ -2,9 +2,11 @@ package gcp import ( "context" + "os" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/LeanerCloud/CUDly/pkg/common" ) @@ -105,3 +107,72 @@ func TestRecommendationsClientAdapter_Fields(t *testing.T) { assert.Equal(t, "my-gcp-project", adapter.projectID) assert.Nil(t, adapter.clientOpts) } + +// TestRecommendationsClientAdapter_GetRecommendations_PropagatesContextCancellation +// pins the contract that GetRecommendations propagates ctx.Err() to its caller +// after the errgroup Wait() — the parent context being cancelled or its +// deadline exceeding must surface as an error rather than being swallowed by +// the per-region error-isolation goroutines (which all return nil to the +// errgroup so a single per-region failure does not cancel siblings). +// +// Without the explicit `if err := ctx.Err(); err != nil { return nil, err }` +// after the outer `g.Wait()`, callers that wrap GetRecommendations with a +// deadline could see "all regions finished cleanly" even when the deadline +// expired mid-fan-out (because every goroutine returned nil from its closure). +// +// Mirrors providers/azure/recommendations_test.go's +// TestRecommendationsClientAdapter_GetRecommendations_PropagatesContextCancellation +// and providers/aws/recommendations/client_test.go's +// TestGetAllRecommendations_PropagatesContextCancellation. +func TestRecommendationsClientAdapter_GetRecommendations_PropagatesContextCancellation(t *testing.T) { + adapter := &RecommendationsClientAdapter{ + ctx: context.Background(), + projectID: "test-project", + } + + // Cancel the context BEFORE the call so we don't depend on race-y timing + // inside the SDK clients. getRegions itself observes the cancelled ctx + // and returns the cancellation error wrapped via fmt.Errorf("failed to + // get regions: %w", err) — errors.Is unwraps that. (If a future refactor + // makes getRegions skip the ctx check, the post-Wait ctx.Err() block is + // still the safety net and the assertion still holds.) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err := adapter.GetRecommendations(ctx, common.RecommendationParams{}) + require.Error(t, err, "expected context.Canceled to propagate from GetRecommendations") + assert.ErrorIs(t, err, context.Canceled, + "GetRecommendations must propagate the parent ctx error") +} + +// TestGCPRegionConcurrency pins the env-knob parsing for +// CUDLY_GCP_REGION_PARALLELISM. +func TestGCPRegionConcurrency(t *testing.T) { + t.Run("unset returns default", func(t *testing.T) { + t.Setenv("CUDLY_GCP_REGION_PARALLELISM", "") + assert.Equal(t, defaultGCPRegionConcurrency, gcpRegionConcurrency()) + }) + t.Run("positive integer overrides default", func(t *testing.T) { + t.Setenv("CUDLY_GCP_REGION_PARALLELISM", "25") + assert.Equal(t, 25, gcpRegionConcurrency()) + }) + t.Run("non-numeric falls back to default", func(t *testing.T) { + t.Setenv("CUDLY_GCP_REGION_PARALLELISM", "many") + assert.Equal(t, defaultGCPRegionConcurrency, gcpRegionConcurrency()) + }) + t.Run("zero falls back to default", func(t *testing.T) { + t.Setenv("CUDLY_GCP_REGION_PARALLELISM", "0") + assert.Equal(t, defaultGCPRegionConcurrency, gcpRegionConcurrency()) + }) + t.Run("negative falls back to default", func(t *testing.T) { + t.Setenv("CUDLY_GCP_REGION_PARALLELISM", "-3") + assert.Equal(t, defaultGCPRegionConcurrency, gcpRegionConcurrency()) + }) + // Sanity check: explicit unset path (Setenv only sets-then-restores; we + // also cover a real Unsetenv to make sure the os.Getenv == "" branch is + // exercised independently of the test framework's stash/restore). + t.Run("explicit unset returns default", func(t *testing.T) { + os.Unsetenv("CUDLY_GCP_REGION_PARALLELISM") + assert.Equal(t, defaultGCPRegionConcurrency, gcpRegionConcurrency()) + }) +} From 5a7a2adcb48bdc35544efce8bdd889e497beedc8 Mon Sep 17 00:00:00 2001 From: Cristian Magherusan-Stanciu Date: Mon, 4 May 2026 21:49:29 +0200 Subject: [PATCH 3/3] perf(scheduler): parallelize provider collection loop AWS/Azure/GCP (closes #268) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The provider loop in scheduler.go::CollectRecommendations ran AWS, Azure and GCP collection sequentially: total wall time = sum(per-provider) rather than max(per-provider). After #260 (async self-invoke) the user- facing 502 on POST /api/recommendations/refresh is gone, but the scheduler Lambda itself still pays the serial cost — visible as a longer "collection in flight" freshness-banner window and reduced cron-tick headroom. Provider-level fan-out under errgroup, mirroring the Azure and AWS service-loop parallelisations (#258 / #266). Each per-provider goroutine returns nil to the group so a single provider's error does not cancel its siblings — preserves the previous loop's `continue`-on-error semantics. Per-provider results are written into a map under a single mutex (the same pattern as fanOutPerAccount at scheduler.go:393). After Wait, ctx.Err() is checked and propagated so a canceled parent ctx surfaces as context.Canceled / context.DeadlineExceeded rather than being swallowed by the per-provider error-isolation goroutines (which all return nil). Deterministic merge: walk EnabledProviders in config order to assemble successfulProviders / successfulCollects / allRecommendations / totalSavings / failedProviders. Order is by config, NOT by goroutine completion order or by Go's randomised map iteration — keeps existing tests stable and the freshness-banner / notification-email content predictable. The new collectAllProviders helper extraction also keeps CollectRecommendations under the project's gocyclo gate (.golangci.yml min-complexity: 15) after the errgroup + post-Wait ctx.Err() block was added. No concurrency cap on the outer fan-out — the universe is at most 3 providers; the per-account fan-out inside each provider is still bounded by CUDLY_MAX_ACCOUNT_PARALLELISM (default 20) and per-region GCP fan-out by CUDLY_GCP_REGION_PARALLELISM (default 10, from #267). Test mocks updated: per-provider mock methods (CreateAndValidateProvider, ListCloudAccounts, GetRecommendationsClient, GetAllRecommendations) are now invoked from inside the errgroup goroutine, so they receive the errgroup-derived gctx rather than the caller's ctx. Mock setups changed from literal ctx to mock.Anything where the call sits inside the goroutine path; calls made before/after the fan-out (GetGlobalConfig, SendNewRecommendationsNotification, persistCollection writes) still pin literal ctx because they run on the caller's context unchanged. New TestScheduler_CollectRecommendations_ParallelProviders pins three contracts: 1. successfulProviders ordering matches EnabledProviders config order, not goroutine completion order or alphabetical. Asserted with a deliberately non-alphabetical config (["gcp", "aws", "azure"]) where AWS fails (ambient credentials → factory mock returns error) and Azure/GCP succeed empty (no enabled accounts) — the resulting successfulProviders must be ["gcp", "azure"], distinguishable from any of: input order, alphabetical, or arbitrary map iteration. 2. ctx cancellation propagates: a pre-cancelled ctx surfaces as context.Canceled (not a "successful but empty" CollectResult). Expected wall-time impact: CollectRecommendations drops from sum(per-provider) to max(per-provider). With AWS+Azure+GCP all enabled and Azure typically the slowest at ~15-20s post-#258, total drops from ~30-45s to ~max(15-20s). --- internal/scheduler/scheduler.go | 133 ++++++++++++++++++++----- internal/scheduler/scheduler_test.go | 143 +++++++++++++++++++++++---- 2 files changed, 232 insertions(+), 44 deletions(-) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 3882e7dc..9e6641cb 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -168,33 +168,23 @@ func (s *Scheduler) CollectRecommendations(ctx context.Context) (*CollectResult, return nil, err } - // Collect recommendations from each enabled provider, tracking per-provider - // outcomes so persistence can scope stale-row eviction to (provider, account) - // pairs that actually ran. A partial collection (e.g. one of three Azure - // subscriptions failed) preserves the failed pairs' previous-cycle rows - // instead of wiping the whole provider's slice. - var allRecommendations []config.RecommendationRecord - var totalSavings float64 - var successfulProviders []string - var successfulCollects []config.SuccessfulCollect - failedProviders := map[string]string{} - - for _, providerName := range globalCfg.EnabledProviders { - logging.Infof("Collecting recommendations from %s...", providerName) - - recs, succeededAccountIDs, err := s.collectProviderRecommendations(ctx, providerName, globalCfg) - if err != nil { - logging.Errorf("Failed to collect %s recommendations: %v", providerName, err) - failedProviders[providerName] = err.Error() - continue - } - successfulProviders = append(successfulProviders, providerName) - successfulCollects = append(successfulCollects, expandSuccessfulCollects(providerName, succeededAccountIDs)...) - - for _, rec := range recs { - totalSavings += rec.Savings - } - allRecommendations = append(allRecommendations, recs...) + // Collect recommendations from each enabled provider concurrently, tracking + // per-provider outcomes so persistence can scope stale-row eviction to + // (provider, account) pairs that actually ran. A partial collection (e.g. + // one of three Azure subscriptions failed) preserves the failed pairs' + // previous-cycle rows instead of wiping the whole provider's slice. + // + // Provider-level fan-out under errgroup. Each goroutine returns nil to the + // group so a single provider's failure does not cancel siblings — matches + // the previous loop's `continue`-on-error behaviour. Per-provider results + // are written into a map under a single mutex; the merge then walks + // EnabledProviders in config order so successfulProviders ordering is + // deterministic regardless of goroutine completion order. After Wait, ctx + // cancellation is propagated. No concurrency cap — the universe is at + // most 3 providers. + allRecommendations, totalSavings, successfulProviders, successfulCollects, failedProviders, err := s.collectAllProviders(ctx, globalCfg) + if err != nil { + return nil, err } logging.Infof("Collected %d recommendations with $%.2f/month potential savings", @@ -254,6 +244,95 @@ func (s *Scheduler) CollectRecommendations(ctx context.Context) (*CollectResult, // On any failure (including partial), the collection error is additionally // recorded in recommendations_state so the frontend banner renders while // the user still sees the valid rows we managed to upsert. +// providerOutcome bundles a single provider's collection outcome for the +// deterministic merge in collectAllProviders. Only one of recs/err is +// meaningful per outcome — err != nil means the provider failed entirely +// (mirrors the pre-fan-out `continue` branch); err == nil means the provider +// succeeded (possibly partially — partial-account-failure semantics live in +// fanOutPerAccount, not here). +type providerOutcome struct { + recs []config.RecommendationRecord + succeededAccountIDs []string + err error +} + +// collectAllProviders fans out provider collection (AWS / Azure / GCP) under +// errgroup. Each goroutine returns nil so a single provider's error does not +// cancel siblings; per-provider results are written into a map under a single +// mutex. After Wait, ctx cancellation is propagated. The merge then walks +// EnabledProviders in config order so successfulProviders / successfulCollects +// / allRecommendations ordering is deterministic regardless of goroutine +// completion order — keeps existing tests stable. No concurrency cap; the +// universe is at most 3 providers. +// +// Extracted from CollectRecommendations to keep that function under the +// project's gocyclo gate (.golangci.yml min-complexity: 15) after the +// errgroup + post-Wait ctx.Err() block was added. +func (s *Scheduler) collectAllProviders(ctx context.Context, globalCfg *config.GlobalConfig) ( + allRecommendations []config.RecommendationRecord, + totalSavings float64, + successfulProviders []string, + successfulCollects []config.SuccessfulCollect, + failedProviders map[string]string, + err error, +) { + failedProviders = map[string]string{} + + var ( + mu sync.Mutex + outcomes = make(map[string]providerOutcome, len(globalCfg.EnabledProviders)) + ) + + g, gctx := errgroup.WithContext(ctx) + + for _, providerName := range globalCfg.EnabledProviders { + providerName := providerName // capture per-iteration + g.Go(func() error { + logging.Infof("Collecting recommendations from %s...", providerName) + recs, succeededAccountIDs, perr := s.collectProviderRecommendations(gctx, providerName, globalCfg) + mu.Lock() + outcomes[providerName] = providerOutcome{ + recs: recs, + succeededAccountIDs: succeededAccountIDs, + err: perr, + } + mu.Unlock() + return nil // error isolation: per-provider failures don't cancel siblings + }) + } + + // Wait for all goroutines. g.Wait() always returns nil because every + // goroutine returns nil. After Wait, propagate ctx cancellation so + // callers can distinguish "all providers completed" from "the parent + // ctx was canceled mid-fan-out". + _ = g.Wait() + if cerr := ctx.Err(); cerr != nil { + return nil, 0, nil, nil, nil, cerr + } + + // Deterministic merge: walk EnabledProviders in config order so + // successfulProviders ordering is independent of goroutine completion + // order — keeps existing tests stable. + for _, providerName := range globalCfg.EnabledProviders { + out, ok := outcomes[providerName] + if !ok { + continue + } + if out.err != nil { + logging.Errorf("Failed to collect %s recommendations: %v", providerName, out.err) + failedProviders[providerName] = out.err.Error() + continue + } + successfulProviders = append(successfulProviders, providerName) + successfulCollects = append(successfulCollects, expandSuccessfulCollects(providerName, out.succeededAccountIDs)...) + for _, rec := range out.recs { + totalSavings += rec.Savings + } + allRecommendations = append(allRecommendations, out.recs...) + } + return allRecommendations, totalSavings, successfulProviders, successfulCollects, failedProviders, nil +} + // clearCollectionStartedBestEffort clears last_collection_started_at on the // scheduler's exit path. Best-effort — a failure here is logged but does not // prevent returning the collection result. Extracted so CollectRecommendations diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index cb33929f..fc8ee994 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -560,7 +560,7 @@ func TestScheduler_CollectRecommendations_AWSProvider(t *testing.T) { mockStore.On("GetGlobalConfig", ctx).Return(globalCfg, nil) // Mock provider factory to return error (simulating no credentials) - mockFactory.On("CreateAndValidateProvider", ctx, mock.Anything, mock.Anything). + mockFactory.On("CreateAndValidateProvider", mock.Anything, mock.Anything, mock.Anything). Return(nil, assert.AnError) scheduler := &Scheduler{ @@ -591,7 +591,7 @@ func TestScheduler_CollectRecommendations_AllProviders(t *testing.T) { mockStore.On("GetGlobalConfig", ctx).Return(globalCfg, nil) // Mock provider factory to return error for all providers (simulating no credentials) - mockFactory.On("CreateAndValidateProvider", ctx, mock.Anything, mock.Anything). + mockFactory.On("CreateAndValidateProvider", mock.Anything, mock.Anything, mock.Anything). Return(nil, assert.AnError) scheduler := &Scheduler{ @@ -607,6 +607,110 @@ func TestScheduler_CollectRecommendations_AllProviders(t *testing.T) { assert.Equal(t, 0, result.Recommendations) } +// TestScheduler_CollectRecommendations_ParallelProviders pins three contracts +// of the provider-level fan-out introduced in collectAllProviders (closes +// #268): +// +// 1. successfulProviders ordering matches EnabledProviders config order, NOT +// goroutine completion order. We assert this with a deliberately non- +// alphabetical config order ["gcp", "aws", "azure"] so the result is +// distinguishable from any of: input order, alphabetical, or arbitrary +// map iteration. +// 2. A single provider's error does not cancel siblings — when the mock +// factory returns an error for "azure" and successes for "aws"+"gcp", +// the result still includes the successful providers and reports +// azure in failedProviders. +// 3. ctx cancellation propagates: a pre-cancelled ctx surfaces as +// context.Canceled (not a "successful but empty" CollectResult). +func TestScheduler_CollectRecommendations_ParallelProviders(t *testing.T) { + t.Run("successfulProviders ordering matches config order, not goroutine completion", func(t *testing.T) { + ctx := context.Background() + mockStore := new(MockConfigStore) + mockEmail := new(MockEmailSender) + mockFactory := new(MockProviderFactory) + + // Deliberately non-alphabetical config order so we can distinguish + // "by config" from "by goroutine completion" from "alphabetical" + // from "by map-iteration order". + // + // MockConfigStore.ListCloudAccounts is a hardcoded stub that + // returns (nil, nil) — so Azure and GCP both hit their + // "no enabled accounts → return nil, nil, nil" early-return path + // (succeed with zero recommendations). AWS falls through to the + // ambient-credential collectAWSAmbient path which calls the + // provider factory; the mock returns assert.AnError so AWS lands + // in failedProviders. + // + // Expected merge under the deterministic-config-order walk: + // successfulProviders = ["gcp", "azure"] (skipping the failed + // "aws" entry, in + // config order) + // failedProviders = {"aws": "..."} + // + // If the merge instead walked the outcomes map (random Go map + // iteration), or sorted alphabetically, or used goroutine + // completion order, we'd see ["azure", "gcp"] or random ordering + // — distinguishable from the expected config-ordered result. + enabled := []string{"gcp", "aws", "azure"} + globalCfg := &config.GlobalConfig{ + EnabledProviders: enabled, + DefaultTerm: 3, + DefaultPayment: "all-upfront", + } + + mockStore.On("GetGlobalConfig", ctx).Return(globalCfg, nil) + mockFactory.On("CreateAndValidateProvider", mock.Anything, mock.Anything, mock.Anything). + Return(nil, assert.AnError) + + scheduler := &Scheduler{ + config: mockStore, + email: mockEmail, + dashboardURL: "https://dashboard.example.com", + providerFactory: mockFactory, + } + + result, err := scheduler.CollectRecommendations(ctx) + require.NoError(t, err) + + assert.Equal(t, []string{"gcp", "azure"}, result.SuccessfulProviders, + "successfulProviders must be in EnabledProviders config order, "+ + "skipping the failed AWS entry; non-alphabetical config "+ + "distinguishes config-order from sort-order") + assert.Len(t, result.FailedProviders, 1) + assert.Contains(t, result.FailedProviders, "aws") + }) + + t.Run("ctx cancellation propagates", func(t *testing.T) { + mockStore := new(MockConfigStore) + mockEmail := new(MockEmailSender) + mockFactory := new(MockProviderFactory) + + globalCfg := &config.GlobalConfig{ + EnabledProviders: []string{"aws", "azure", "gcp"}, + } + // GetGlobalConfig is called pre-fan-out. We need it to succeed so + // we reach the fan-out, where the cancelled ctx is observed. + mockStore.On("GetGlobalConfig", mock.Anything).Return(globalCfg, nil) + mockFactory.On("CreateAndValidateProvider", mock.Anything, mock.Anything, mock.Anything). + Return(nil, assert.AnError) + + scheduler := &Scheduler{ + config: mockStore, + email: mockEmail, + dashboardURL: "https://dashboard.example.com", + providerFactory: mockFactory, + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel BEFORE the call + + _, err := scheduler.CollectRecommendations(ctx) + require.Error(t, err, "expected context.Canceled to propagate from CollectRecommendations") + assert.ErrorIs(t, err, context.Canceled, + "CollectRecommendations must propagate the parent ctx error after the provider fan-out's g.Wait()") + }) +} + func TestScheduler_CollectRecommendations_UnknownProvider(t *testing.T) { ctx := context.Background() mockStore := new(MockConfigStore) @@ -643,7 +747,7 @@ func TestScheduler_CollectAWSRecommendations(t *testing.T) { } // Mock provider factory to return error (simulating no credentials) - mockFactory.On("CreateAndValidateProvider", ctx, "aws", mock.Anything). + mockFactory.On("CreateAndValidateProvider", mock.Anything, "aws", mock.Anything). Return(nil, assert.AnError) scheduler := &Scheduler{ @@ -665,7 +769,7 @@ func TestScheduler_CollectAzureRecommendations_NoAccounts(t *testing.T) { DefaultPayment: "all-upfront", } - mockStore.On("ListCloudAccounts", ctx, mock.Anything).Return([]config.CloudAccount{}, nil) + mockStore.On("ListCloudAccounts", mock.Anything, mock.Anything).Return([]config.CloudAccount{}, nil) scheduler := &Scheduler{config: mockStore} @@ -683,7 +787,7 @@ func TestScheduler_CollectGCPRecommendations_NoAccounts_Alt(t *testing.T) { DefaultPayment: "all-upfront", } - mockStore.On("ListCloudAccounts", ctx, mock.Anything).Return([]config.CloudAccount{}, nil) + mockStore.On("ListCloudAccounts", mock.Anything, mock.Anything).Return([]config.CloudAccount{}, nil) scheduler := &Scheduler{config: mockStore} @@ -703,10 +807,10 @@ func TestScheduler_CollectProviderRecommendations(t *testing.T) { } // AWS ambient fallback: factory returns error - mockFactory.On("CreateAndValidateProvider", ctx, mock.Anything, mock.Anything). + mockFactory.On("CreateAndValidateProvider", mock.Anything, mock.Anything, mock.Anything). Return(nil, assert.AnError) // Azure/GCP: no accounts → skip gracefully - mockStore.On("ListCloudAccounts", ctx, mock.Anything).Return([]config.CloudAccount{}, nil) + mockStore.On("ListCloudAccounts", mock.Anything, mock.Anything).Return([]config.CloudAccount{}, nil) scheduler := &Scheduler{ config: mockStore, @@ -753,7 +857,7 @@ func TestScheduler_CollectRecommendations_WithNotification(t *testing.T) { mockStore.On("GetGlobalConfig", ctx).Return(globalCfg, nil) // Mock provider factory to return error (simulating no credentials) - mockFactory.On("CreateAndValidateProvider", ctx, mock.Anything, mock.Anything). + mockFactory.On("CreateAndValidateProvider", mock.Anything, mock.Anything, mock.Anything). Return(nil, assert.AnError) // No expectation for SendNewRecommendationsNotification because // there are no recommendations @@ -1455,7 +1559,7 @@ func TestScheduler_CollectAWSRecommendations_Success(t *testing.T) { }, } - mockFactory.On("CreateAndValidateProvider", ctx, "aws", mock.Anything).Return(mockProvider, nil) + mockFactory.On("CreateAndValidateProvider", mock.Anything, "aws", mock.Anything).Return(mockProvider, nil) mockProvider.On("GetRecommendationsClient", ctx).Return(mockRecClient, nil) mockRecClient.On("GetAllRecommendations", ctx).Return(recommendations, nil) @@ -1482,7 +1586,7 @@ func TestScheduler_CollectAWSRecommendations_RecClientError(t *testing.T) { DefaultPayment: "all-upfront", } - mockFactory.On("CreateAndValidateProvider", ctx, "aws", mock.Anything).Return(mockProvider, nil) + mockFactory.On("CreateAndValidateProvider", mock.Anything, "aws", mock.Anything).Return(mockProvider, nil) mockProvider.On("GetRecommendationsClient", ctx).Return(nil, assert.AnError) scheduler := &Scheduler{ @@ -1508,7 +1612,7 @@ func TestScheduler_CollectAWSRecommendations_GetRecsError(t *testing.T) { DefaultPayment: "all-upfront", } - mockFactory.On("CreateAndValidateProvider", ctx, "aws", mock.Anything).Return(mockProvider, nil) + mockFactory.On("CreateAndValidateProvider", mock.Anything, "aws", mock.Anything).Return(mockProvider, nil) mockProvider.On("GetRecommendationsClient", ctx).Return(mockRecClient, nil) mockRecClient.On("GetAllRecommendations", ctx).Return(nil, assert.AnError) @@ -1542,7 +1646,7 @@ func TestScheduler_CollectAzureRecommendations_Success(t *testing.T) { Enabled: true, }, } - mockStore.On("ListCloudAccounts", ctx, mock.Anything).Return(azureAccounts, nil) + mockStore.On("ListCloudAccounts", mock.Anything, mock.Anything).Return(azureAccounts, nil) // The managed_identity path will try DefaultAzureCredential which will // fail in tests, so we expect an error log but no crash. @@ -1567,7 +1671,7 @@ func TestScheduler_CollectGCPRecommendations_NoAccounts(t *testing.T) { DefaultPayment: "all-upfront", } - mockStore.On("ListCloudAccounts", ctx, mock.Anything).Return([]config.CloudAccount{}, nil) + mockStore.On("ListCloudAccounts", mock.Anything, mock.Anything).Return([]config.CloudAccount{}, nil) scheduler := &Scheduler{ config: mockStore, @@ -1606,9 +1710,14 @@ func TestScheduler_CollectRecommendations_WithSuccessfulRecs(t *testing.T) { } mockStore.On("GetGlobalConfig", ctx).Return(globalCfg, nil) - mockFactory.On("CreateAndValidateProvider", ctx, "aws", mock.Anything).Return(mockProvider, nil) - mockProvider.On("GetRecommendationsClient", ctx).Return(mockRecClient, nil) - mockRecClient.On("GetAllRecommendations", ctx).Return(recommendations, nil) + mockFactory.On("CreateAndValidateProvider", mock.Anything, "aws", mock.Anything).Return(mockProvider, nil) + // Provider/RecClient are invoked from inside the per-provider errgroup + // goroutine in collectAllProviders, so they receive the errgroup-derived + // gctx rather than the caller's ctx. mock.Anything keeps the assertion + // resilient to that wrap (the post-Wait ctx.Err() check + cancellation + // contract test cover the cancellation path). + mockProvider.On("GetRecommendationsClient", mock.Anything).Return(mockRecClient, nil) + mockRecClient.On("GetAllRecommendations", mock.Anything).Return(recommendations, nil) mockEmail.On("SendNewRecommendationsNotification", ctx, mock.AnythingOfType("email.NotificationData")).Return(nil) scheduler := &Scheduler{ @@ -1652,7 +1761,7 @@ func TestScheduler_CollectAWSRecommendations_FallbackToFiltered(t *testing.T) { }, } - mockFactory.On("CreateAndValidateProvider", ctx, "aws", mock.Anything).Return(mockProvider, nil) + mockFactory.On("CreateAndValidateProvider", mock.Anything, "aws", mock.Anything).Return(mockProvider, nil) mockProvider.On("GetRecommendationsClient", ctx).Return(mockRecClient, nil) mockRecClient.On("GetAllRecommendations", ctx).Return([]common.Recommendation{}, nil) // Empty mockRecClient.On("GetRecommendations", ctx, mock.AnythingOfType("common.RecommendationParams")).Return(filteredRecommendations, nil)