From aff26b87e891f93522532b1d9db28c4022470ee0 Mon Sep 17 00:00:00 2001 From: Cristian Magherusan-Stanciu Date: Mon, 4 May 2026 15:38:07 +0200 Subject: [PATCH 1/2] perf(azure): parallelize 5 sequential service calls in GetRecommendations (closes #258) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The dispatcher in providers/azure/recommendations.go::GetRecommendations was running 5 service calls (compute / database / cache / cosmosdb / advisor) one after another. Each per-service call internally hits two slow Azure endpoints (ARM Consumption Reservation Recommendations ~5-15s, prices.azure.com retail pricing pagination ~5-10s), totalling 50-125s end-to-end. This consistently exceeded the API Lambda's 60s timeout, producing 502s on user-triggered refresh. The five calls have no inter-service dependency, so they can run concurrently under errgroup.WithContext. Each goroutine captures its own error in a closure-scoped variable and returns nil to the group so that one service's failure does not cancel its siblings (matching the previous sequential behaviour where each error was logged via logging.Warnf and the loop continued). Results are appended in the original deterministic order (compute → database → cache → cosmosdb → advisor) after Wait() so any order-sensitive callers stay stable. Expected end-to-end after this change: ~max(individual call durations) ≈ 15-20s, well within the 60s timeout. --- providers/azure/go.mod | 5 +- providers/azure/go.sum | 2 + providers/azure/recommendations.go | 104 ++++++++++++++++++++++------- 3 files changed, 83 insertions(+), 28 deletions(-) diff --git a/providers/azure/go.mod b/providers/azure/go.mod index e223e198..3c0e05fd 100644 --- a/providers/azure/go.mod +++ b/providers/azure/go.mod @@ -1,8 +1,6 @@ module github.com/LeanerCloud/CUDly/providers/azure -go 1.23.0 - -toolchain go1.24.4 +go 1.25.0 require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.1 @@ -18,6 +16,7 @@ require ( github.com/LeanerCloud/CUDly/pkg v0.0.0 github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.11.1 + golang.org/x/sync v0.20.0 ) require ( diff --git a/providers/azure/go.sum b/providers/azure/go.sum index fb36bd85..1a0d2760 100644 --- a/providers/azure/go.sum +++ b/providers/azure/go.sum @@ -72,6 +72,8 @@ golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= diff --git a/providers/azure/recommendations.go b/providers/azure/recommendations.go index e53c6184..3298326d 100644 --- a/providers/azure/recommendations.go +++ b/providers/azure/recommendations.go @@ -9,11 +9,13 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/advisor/armadvisor" + "golang.org/x/sync/errgroup" "github.com/LeanerCloud/CUDly/pkg/common" "github.com/LeanerCloud/CUDly/pkg/logging" "github.com/LeanerCloud/CUDly/providers/azure/services/cache" "github.com/LeanerCloud/CUDly/providers/azure/services/compute" + "github.com/LeanerCloud/CUDly/providers/azure/services/cosmosdb" "github.com/LeanerCloud/CUDly/providers/azure/services/database" ) @@ -55,48 +57,100 @@ func NewRecommendationsClientAdapter(cred azcore.TokenCredential, subscriptionID // blank on the client — converters must populate Region from the response data // (see known_issues/10_azure_provider.md CRITICAL "Recommendation converters // ignore the API response entirely" for the matching converter work). +// +// All five service calls run concurrently under errgroup. Each goroutine captures +// its own error and returns nil to the group so that a single service failure +// does not cancel sibling calls. Results are appended in a deterministic order +// (compute → database → cache → cosmosdb → advisor) after all goroutines finish. func (r *RecommendationsClientAdapter) GetRecommendations(ctx context.Context, params common.RecommendationParams) ([]common.Recommendation, error) { - allRecommendations := make([]common.Recommendation, 0) + var ( + computeRecs, dbRecs, cacheRecs, cosmosRecs, advisorRecs []common.Recommendation + computeErr, dbErr, cacheErr, cosmosErr, advisorErr error + ) + + g, gctx := errgroup.WithContext(ctx) // Compute (VM) recommendations — subscription-wide. if shouldIncludeService(params, common.ServiceCompute) { - computeClient := compute.NewClient(r.cred, r.subscriptionID, "") - computeRecs, err := computeClient.GetRecommendations(ctx, params) - if err != nil { - logging.Warnf("Azure compute recommendations: %v", err) - } else { - allRecommendations = append(allRecommendations, computeRecs...) - } + g.Go(func() error { + computeClient := compute.NewClient(r.cred, r.subscriptionID, "") + computeRecs, computeErr = computeClient.GetRecommendations(gctx, params) + return nil // error isolation: never propagate to errgroup + }) } // Database (SQL) recommendations — subscription-wide. if shouldIncludeService(params, common.ServiceRelationalDB) { - dbClient := database.NewClient(r.cred, r.subscriptionID, "") - dbRecs, err := dbClient.GetRecommendations(ctx, params) - if err != nil { - logging.Warnf("Azure database recommendations: %v", err) - } else { - allRecommendations = append(allRecommendations, dbRecs...) - } + g.Go(func() error { + dbClient := database.NewClient(r.cred, r.subscriptionID, "") + dbRecs, dbErr = dbClient.GetRecommendations(gctx, params) + return nil + }) } // Cache (Redis) recommendations — subscription-wide. if shouldIncludeService(params, common.ServiceCache) { - cacheClient := cache.NewClient(r.cred, r.subscriptionID, "") - cacheRecs, err := cacheClient.GetRecommendations(ctx, params) - if err != nil { - logging.Warnf("Azure cache recommendations: %v", err) - } else { - allRecommendations = append(allRecommendations, cacheRecs...) - } + g.Go(func() error { + cacheClient := cache.NewClient(r.cred, r.subscriptionID, "") + cacheRecs, cacheErr = cacheClient.GetRecommendations(gctx, params) + return nil + }) + } + + // CosmosDB (NoSQL) recommendations — subscription-wide. + if shouldIncludeService(params, common.ServiceNoSQL) { + g.Go(func() error { + cosmosClient := cosmosdb.NewClient(r.cred, r.subscriptionID, "") + cosmosRecs, cosmosErr = cosmosClient.GetRecommendations(gctx, params) + return nil + }) } // Azure Advisor adds cross-cutting cost recommendations independent of the // per-service Reservation API. Failures here are non-fatal — the per-service // results above are still useful on their own. - advisorRecs, err := r.getAdvisorRecommendations(ctx, params) - if err != nil { - logging.Errorf("Failed to get Azure Advisor recommendations: %v", err) + g.Go(func() error { + advisorRecs, advisorErr = r.getAdvisorRecommendations(gctx, params) + return nil + }) + + // Wait for all goroutines. g.Wait() always returns nil because every goroutine + // returns nil — errors are captured in per-service variables above. + _ = g.Wait() + + // Log per-service errors (matches previous sequential behaviour). Append + // results in the same deterministic order as the original sequential code + // (compute → database → cache → cosmosdb → advisor) so that any + // order-sensitive callers remain stable. + allRecommendations := make([]common.Recommendation, 0, + len(computeRecs)+len(dbRecs)+len(cacheRecs)+len(cosmosRecs)+len(advisorRecs)) + + if computeErr != nil { + logging.Warnf("Azure compute recommendations: %v", computeErr) + } else { + allRecommendations = append(allRecommendations, computeRecs...) + } + + if dbErr != nil { + logging.Warnf("Azure database recommendations: %v", dbErr) + } else { + allRecommendations = append(allRecommendations, dbRecs...) + } + + if cacheErr != nil { + logging.Warnf("Azure cache recommendations: %v", cacheErr) + } else { + allRecommendations = append(allRecommendations, cacheRecs...) + } + + if cosmosErr != nil { + logging.Warnf("Azure cosmosdb recommendations: %v", cosmosErr) + } else { + allRecommendations = append(allRecommendations, cosmosRecs...) + } + + if advisorErr != nil { + logging.Errorf("Failed to get Azure Advisor recommendations: %v", advisorErr) } else { allRecommendations = append(allRecommendations, advisorRecs...) } From 7c43b05350eec1362e406efbccf5d5b48a08d78c Mon Sep 17 00:00:00 2001 From: Cristian Magherusan-Stanciu Date: Mon, 4 May 2026 16:26:57 +0200 Subject: [PATCH 2/2] fix(azure): propagate ctx cancellation + pin contract test (CR pass-1, #258) Two CodeRabbit findings on PR #259: 1. providers/azure/recommendations.go::GetRecommendations now checks ctx.Err() after g.Wait() and propagates context.Canceled / context.DeadlineExceeded to callers. Without this, every per- service goroutine returns nil to the errgroup (intentional, for error isolation across services), so the parent ctx being cancelled mid-fan-out would silently produce "all services completed cleanly" when in fact the caller's deadline expired. 2. New test TestRecommendationsClientAdapter_GetRecommendations_ PropagatesContextCancellation pins the contract: calling GetRecommendations with a pre-cancelled context must return context.Canceled, not nil. Result-merging extracted into mergeServiceResults helper + serviceResult struct so GetRecommendations stays under the project's cyclomatic-complexity gate after the post-Wait ctx.Err() block was added. Behaviour is identical to the previous inline merge (per- service errors logged in the same order with the same severity, then successful results appended in canonical compute -> database -> cache -> cosmosdb -> advisor order). CR also requested a concurrency timing test using fake clients per service to assert total elapsed approx max-single-client (proving the errgroup actually parallelises). Deferred to a follow-up issue because the existing service-client constructors build concrete Azure SDK types with no injection seam. --- providers/azure/recommendations.go | 81 ++++++++++++++----------- providers/azure/recommendations_test.go | 31 ++++++++++ 2 files changed, 77 insertions(+), 35 deletions(-) diff --git a/providers/azure/recommendations.go b/providers/azure/recommendations.go index 3298326d..ffb773d9 100644 --- a/providers/azure/recommendations.go +++ b/providers/azure/recommendations.go @@ -114,48 +114,59 @@ func (r *RecommendationsClientAdapter) GetRecommendations(ctx context.Context, p return nil }) - // Wait for all goroutines. g.Wait() always returns nil because every goroutine - // returns nil — errors are captured in per-service variables above. + // Wait for all goroutines. g.Wait() always returns nil because every + // goroutine returns nil — errors are captured in per-service variables + // above. After Wait, propagate ctx cancellation so callers can distinguish + // "all five services completed (with possibly per-service errors)" from + // "the parent ctx was canceled mid-fan-out". Without this check the + // CHECK could swallow a deadline exceeded that the caller expected to + // see. _ = g.Wait() - - // Log per-service errors (matches previous sequential behaviour). Append - // results in the same deterministic order as the original sequential code - // (compute → database → cache → cosmosdb → advisor) so that any - // order-sensitive callers remain stable. - allRecommendations := make([]common.Recommendation, 0, - len(computeRecs)+len(dbRecs)+len(cacheRecs)+len(cosmosRecs)+len(advisorRecs)) - - if computeErr != nil { - logging.Warnf("Azure compute recommendations: %v", computeErr) - } else { - allRecommendations = append(allRecommendations, computeRecs...) + if err := ctx.Err(); err != nil { + return nil, err } - if dbErr != nil { - logging.Warnf("Azure database recommendations: %v", dbErr) - } else { - allRecommendations = append(allRecommendations, dbRecs...) - } + return mergeServiceResults(serviceResult{"compute", computeRecs, computeErr}, + serviceResult{"database", dbRecs, dbErr}, + serviceResult{"cache", cacheRecs, cacheErr}, + serviceResult{"cosmosdb", cosmosRecs, cosmosErr}, + serviceResult{"advisor", advisorRecs, advisorErr}), nil +} - if cacheErr != nil { - logging.Warnf("Azure cache recommendations: %v", cacheErr) - } else { - allRecommendations = append(allRecommendations, cacheRecs...) - } +// serviceResult bundles a per-service collection outcome for the deterministic +// merge in mergeServiceResults. Extracted into a helper so GetRecommendations +// stays under the cyclomatic-complexity gate after the post-Wait ctx.Err() +// propagation was added. +type serviceResult struct { + name string + recs []common.Recommendation + err error +} - if cosmosErr != nil { - logging.Warnf("Azure cosmosdb recommendations: %v", cosmosErr) - } else { - allRecommendations = append(allRecommendations, cosmosRecs...) +// mergeServiceResults logs per-service errors (matches the previous sequential +// behaviour where each error was logged inline via logging.Warnf) and appends +// successful results in the order the slice is passed — callers must preserve +// the canonical compute → database → cache → cosmosdb → advisor order so that +// order-sensitive consumers remain stable. The advisor entry's error is logged +// via logging.Errorf to match the pre-parallelisation severity. +func mergeServiceResults(results ...serviceResult) []common.Recommendation { + total := 0 + for _, r := range results { + total += len(r.recs) } - - if advisorErr != nil { - logging.Errorf("Failed to get Azure Advisor recommendations: %v", advisorErr) - } else { - allRecommendations = append(allRecommendations, advisorRecs...) + out := make([]common.Recommendation, 0, total) + for _, r := range results { + if r.err != nil { + if r.name == "advisor" { + logging.Errorf("Failed to get Azure Advisor recommendations: %v", r.err) + } else { + logging.Warnf("Azure %s recommendations: %v", r.name, r.err) + } + continue + } + out = append(out, r.recs...) } - - return allRecommendations, nil + return out } // GetRecommendationsForService retrieves Azure reservation recommendations for a specific service diff --git a/providers/azure/recommendations_test.go b/providers/azure/recommendations_test.go index 2cf52403..b2eb251d 100644 --- a/providers/azure/recommendations_test.go +++ b/providers/azure/recommendations_test.go @@ -216,6 +216,37 @@ func TestRecommendationsClientAdapter_GetAllRecommendations(t *testing.T) { _, _ = adapter.GetAllRecommendations(context.Background()) } +// TestRecommendationsClientAdapter_GetRecommendations_PropagatesContextCancellation +// pins the contract that GetRecommendations propagates ctx.Err() to its caller +// after the errgroup Wait() — the parent context being cancelled or its +// deadline exceeding must surface as an error rather than being swallowed by +// the per-service error-isolation goroutines (which all return nil to the +// errgroup so a single per-service failure does not cancel siblings). +// +// Without the explicit `if err := ctx.Err(); err != nil { return nil, err }` +// after `g.Wait()`, callers that wrap GetRecommendations with a deadline could +// see "all services finished cleanly" even when the deadline expired +// mid-fan-out (because every goroutine returned nil from its closure). +func TestRecommendationsClientAdapter_GetRecommendations_PropagatesContextCancellation(t *testing.T) { + adapter := &RecommendationsClientAdapter{ + cred: &mockAzureTokenCredential{}, + subscriptionID: "test-subscription", + } + + // Cancel the context BEFORE the call so we don't depend on race-y timing + // inside the SDK clients. The Azure clients constructed inside the + // goroutines will observe the cancelled gctx (derived from the parent ctx + // via errgroup.WithContext) and either short-circuit or return cancelled + // errors; either way, our post-Wait ctx.Err() check returns context.Canceled. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err := adapter.GetRecommendations(ctx, common.RecommendationParams{}) + require.Error(t, err, "expected context.Canceled to propagate from GetRecommendations") + assert.ErrorIs(t, err, context.Canceled, + "GetRecommendations must propagate the parent ctx error after g.Wait()") +} + func TestExtractServiceType(t *testing.T) { tests := []struct { name string