perf: aggregate parallelism cap + 2 GB Lambda for collect refresh (followup #269)#270
Conversation
…hore
The recommendations-collection fan-out has up to four nested levels:
provider → account → service|region → per-region service. Each level was
independently capped, so peak goroutine counts multiplied through the
tree (3 providers × 20 accounts × 30 GCP regions × 2 services = 3 600
in-flight gRPC/HTTP clients on a real multi-account deploy). On a 512 MB
Lambda this exhausted memory before the work could finish — observed in
dev as `signal: killed` at Max Memory Used = 512 MB after ~28s, with the
runtime never reaching the 60s timeout.
A single semaphore stashed on ctx now lets every leaf goroutine (the
goroutine that issues the actual cloud-API call) Acquire one slot before
doing IO and Release after, so aggregate concurrent IO is hard-bounded
by CUDLY_MAX_PARALLELISM (default 20) regardless of how nested the
dispatch is. Intermediate dispatchers (provider, account, GCP region)
do NOT acquire — they only launch sub-goroutines — so no goroutine can
deadlock by holding a permit while waiting for sub-permits.
Mechanics:
- New pkg/concurrency package: WithSharedSemaphore / SharedSemaphore /
Acquire / Release helpers that read the semaphore from ctx, plus
MaxParallelismFromEnv reading CUDLY_MAX_PARALLELISM (default 20).
Acquire/Release are no-ops when no semaphore is on ctx — CLI tools
and unit tests skip the semaphore entirely without per-call branching.
- Scheduler.CollectRecommendations allocates the semaphore (size from
env) and attaches to ctx via concurrency.WithSharedSemaphore BEFORE
the provider fan-out, so the wrapped ctx flows through every nested
level. Logs the effective cap on entry.
- Three leaf fan-outs wired:
- providers/aws/recommendations/client.go::GetAllRecommendations —
5 service goroutines (EC2 / RDS / ElastiCache / OpenSearch /
Redshift)
- providers/azure/recommendations.go::GetRecommendations — 5
service goroutines (compute / database / cache / cosmosdb /
advisor); Acquire/Release boilerplate extracted into a goService
helper so the function stays under gocyclo's 10-branch gate.
- providers/gcp/recommendations.go::collectRegion — 2 sub-fan-out
goroutines (compute / cloudsql) per region
Each leaf calls Acquire(gctx) at the top, defers Release(gctx). On
Acquire failure (parent ctx cancelled mid-wait) the leaf's per-service
err variable is set to ctx.Err() and the goroutine returns nil to the
errgroup — preserves the documented error-isolation contract.
Test mocks updated: persistCollection's UpsertRecommendations and the
notification email's SendNewRecommendationsNotification both run inside
CollectRecommendations after the ctx is wrapped, so their mock setups
were pinning the original (unwrapped) ctx. mock.Anything is the
appropriate looseness — the wrapped ctx is implementation detail.
New pkg/concurrency tests pin:
- MaxParallelismFromEnv env-knob parser (unset / positive / zero /
negative / non-numeric / explicit-unset)
- Acquire/Release are no-ops when no semaphore on ctx
- WithSharedSemaphore returns ctx unchanged when sem is nil
- 20 goroutines under cap=3 never see >3 in-flight (load-bearing)
- Acquire returns ctx.Err() when cancelled mid-wait
This is a smoke-test config — the user's dev deploy hit OOM with the
existing per-level caps; CUDLY_MAX_PARALLELISM=20 (paired with a Lambda
memory bump to 2 GB in a follow-up commit) should give the dev refresh
headroom to complete. Operators can dial up further via env override.
The dev Lambda hit Max Memory Used = 512 MB and was killed by the runtime at ~28s (Duration 28701ms / Timeout 60s) on a user-triggered recommendations refresh. The collection fan-out (post-#266 #267 #268) keeps many concurrent gRPC/HTTP clients in flight — 30+ GCP regions × 2 services each, plus 5 AWS services and 5 Azure services — and 512 MB is too tight for that working-set even with the new shared-semaphore cap. This is a temporary headroom bump scoped to dev so the user can verify the post-#266/#267/#268/concurrency-semaphore stack actually completes end-to-end on a deployed Lambda. The shared semaphore (preceding commit) is the durable fix for peak concurrency; this just gives the function room to breathe while we observe the new behaviour live. Only github-dev.tfvars is touched. github-staging.tfvars and github-prod.tfvars stay at 1024 MB — they don't exercise every provider with mostly-broken credentials the way dev does, and bumping prod without first observing dev would be premature. dev.tfvars (the user's local-apply config, gitignored) is also bumped locally to keep parity with the CI deploy until this lands.
📝 WalkthroughWalkthroughA new package provides a process-wide, environment-configurable weighted semaphore. The scheduler installs a shared semaphore on the context, and AWS/Azure/GCP provider code acquires/releases permits around their leaf cloud-API calls to cap aggregate parallelism while preserving per-service error isolation. ChangesConcurrency Control Integration
Sequence DiagramsequenceDiagram
participant Scheduler
participant Concurrency as Concurrency Package
participant Context as Context
participant Provider as Provider (AWS/Azure/GCP)
participant CloudAPI as Cloud API
Scheduler->>Concurrency: MaxParallelismFromEnv()
activate Concurrency
Concurrency-->>Scheduler: numeric cap
deactivate Concurrency
Scheduler->>Concurrency: WithSharedSemaphore(ctx, sem)
activate Concurrency
Concurrency->>Context: attach semaphore
Concurrency-->>Scheduler: wrapped ctx
deactivate Concurrency
Scheduler->>Provider: CollectRecommendations(wrapped ctx)
rect rgba(100, 150, 200, 0.5)
note over Provider: service/region fan-out
par Service A
Provider->>Concurrency: Acquire(wrapped ctx)
activate Concurrency
Concurrency-->>Provider: permit acquired (or wait)
deactivate Concurrency
Provider->>CloudAPI: Leaf API call
CloudAPI-->>Provider: response/error
Provider->>Concurrency: Release(wrapped ctx)
and Service B
Provider->>Concurrency: Acquire(wrapped ctx)
activate Concurrency
Concurrency-->>Provider: permit acquired (or wait)
deactivate Concurrency
Provider->>CloudAPI: Leaf API call
CloudAPI-->>Provider: response/error
Provider->>Concurrency: Release(wrapped ctx)
end
end
Provider-->>Scheduler: merged results/errors
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Possibly related issues
Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/concurrency/concurrency_test.go`:
- Around line 82-95: The worker goroutines call require.NoError(t, Acquire(ctx))
which is illegal from non-test goroutines; instead, have each goroutine capture
the error and report it back to the main test goroutine (e.g., send errors on an
err channel or append to a slice with a mutex) and only call require.NoError
from the main goroutine after wg.Wait(); also ensure Release(ctx) is only
deferred when Acquire returned nil (successful) so the goroutine sends the
Acquire error (or nil) back, main goroutine closes the channel after wg.Wait()
and asserts NoError for each reported error. Reference Acquire, Release,
inflight, updatePeak, goroutines, and wg to locate the logic to change.
In `@providers/aws/recommendations/client.go`:
- Around line 196-244: The current goroutines (the g.Go blocks) acquire a global
semaphore before calling GetRecommendationsForService, which holds a slot for
the entire service sweep; move the semaphore boundary into the Cost Explorer
call itself so each outbound CE request acquires/releases a permit. Remove
concurrency.Acquire/Release from the outer g.Go closures and instead update
GetRecommendationsForService (or the helper it calls) to call
concurrency.Acquire(ctx) immediately before each individual Cost Explorer API
invocation and defer concurrency.Release(ctx) right after that request/response
(including retries/backoffs around only the API call). This ensures the
semaphore limits concurrent CE calls, not per-service sweeps.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1a60a4b4-4a48-44a2-8222-e2c11a8bf20c
⛔ Files ignored due to path filters (2)
pkg/go.sumis excluded by!**/*.sumproviders/gcp/go.sumis excluded by!**/*.sum
📒 Files selected for processing (10)
internal/scheduler/scheduler.gointernal/scheduler/scheduler_test.gopkg/concurrency/concurrency.gopkg/concurrency/concurrency_test.gopkg/go.modproviders/aws/recommendations/client.goproviders/azure/recommendations.goproviders/gcp/go.modproviders/gcp/recommendations.goterraform/environments/aws/github-dev.tfvars
Two CodeRabbit findings on PR #270: 1. **`pkg/concurrency/concurrency_test.go` (CRITICAL)**: `require.NoError` was called from inside the worker goroutines in `TestSharedSemaphore_BoundsConcurrency`. Testify's documented contract is that `require.*` / `FailNow`-style helpers must run on the test's own goroutine — calling them from a worker uses `runtime.Goexit` on the worker instead of safely stopping the test, which can hang or skip cleanup and produces race-detector noise. Workers now capture their Acquire result on a buffered `errCh` and the main goroutine drains the channel and asserts after `wg.Wait()`. `Release` is deferred only on a successful Acquire, matching the documented pairing contract. 2. **`providers/aws/recommendations/client.go` (MAJOR)**: The per-service `g.Go` goroutines in `GetAllRecommendations` were calling `concurrency.Acquire`/`Release` around the whole `GetRecommendationsForService` sweep — but that helper internally loops 2 × 3 (term, payment) variants and waits on the rate-limiter inside each retry, so a throttled service would hold a global slot for seconds while no Cost Explorer request was actually in flight. Effective parallelism dropped well below `CUDLY_MAX_PARALLELISM`. The Acquire/Release boundary moves down to each individual `costExplorerClient.GetReservationPurchaseRecommendation` call (and the SP equivalent in `parser_sp.go`). The rate-limiter's `Wait` happens *outside* the permit, so a goroutine waiting on exponential-backoff backoff doesn't tie up a slot. Each retry acquires a fresh permit, so throughput now scales with the cap regardless of how many services are individually throttled. Tests: - `go test ./recommendations/...` from `providers/aws/` — pass - `go test -race ./concurrency/...` from `pkg/` — race-clean - Existing `go test -race ./recommendations/...` reveals a pre-existing data race on `Client.rateLimiter` (concurrent `Reset` / state mutation across the per-service goroutines), introduced by the parallelisation in #269 (`defac6c2a`). Filed as #271 — out of scope for this CR-fix commit, fix is straightforward (per-goroutine rate- limiter or internal mutex). Other CodeRabbit checks (Azure / GCP per-service goroutines holding permits across SDK calls) are NOT applied here because: - Azure's per-service SDK calls don't have an explicit rate-limiter Wait loop equivalent to AWS's — the inner ARM/pricing calls block on network IO, not exponential backoff. - GCP's `collectRegion` sub-fan-out goroutines do similarly direct SDK calls without an explicit Wait loop. - CR didn't flag those layers; if a future review surfaces the same argument with concrete throughput evidence, we can mirror the AWS pattern there.
|
Addressed both CR findings in 092e0b3: 1. 2. Out-of-scope finding surfaced during validation: Azure/GCP not changed — CR's specific argument (permit held across rate-limiter Wait loop) is structural to AWS's Cost Explorer client, which has an explicit @coderabbitai review |
|
Tip For best results, initiate chat on the files or code changes.
The Azure/GCP non-change rationale is sound — the concern is specific to the explicit Good catch on the Triggering a fresh review of the updated commit. [review] |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
providers/aws/recommendations/client.go (1)
74-100:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftMake the retry limiter request-scoped, not client-scoped.
GetAllRecommendationsfans out multiple service sweeps concurrently, but this path still mutates the sharedc.rateLimiterviaReset/Wait/ShouldRetry/GetRetryCount. That lets concurrent requests clobber each other’s retry/backoff state and matches the race already seen undergo test -race. Please instantiate a fresh limiter per recommendation sweep here, and mirror that change in the Savings Plans path too.Suggested direction
- c.rateLimiter.Reset() + rateLimiter := NewRateLimiter() + rateLimiter.Reset() for { - if waitErr := c.rateLimiter.Wait(ctx); waitErr != nil { + if waitErr := rateLimiter.Wait(ctx); waitErr != nil { return nil, fmt.Errorf("rate limiter wait failed: %w", waitErr) } if acqErr := concurrency.Acquire(ctx); acqErr != nil { return nil, fmt.Errorf("concurrency acquire failed: %w", acqErr) } result, err = c.costExplorerClient.GetReservationPurchaseRecommendation(ctx, input) concurrency.Release(ctx) - if !c.rateLimiter.ShouldRetry(err) { + if !rateLimiter.ShouldRetry(err) { break } } if err != nil { - return nil, fmt.Errorf("failed to get RI recommendations after %d retries: %w", c.rateLimiter.GetRetryCount(), err) + return nil, fmt.Errorf("failed to get RI recommendations after %d retries: %w", rateLimiter.GetRetryCount(), err) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@providers/aws/recommendations/client.go` around lines 74 - 100, The shared c.rateLimiter is being mutated across concurrent sweeps; make the limiter request-scoped by creating a fresh local limiter instance at the start of the recommendation loop and use that instead of c.rateLimiter. Concretely, before the for-loop in GetReservationPurchaseRecommendation create a copy/localLimiter (e.g., via a Clone/Copy/New method or by constructing a new limiter with the same config), call localLimiter.Reset()/localLimiter.Wait()/localLimiter.ShouldRetry()/localLimiter.GetRetryCount() inside the loop, and remove mutations of c.rateLimiter; then apply the same change to the Savings Plans path (GetSavingsPlanPurchaseRecommendation) and ensure callers in GetAllRecommendations continue to fan out safely using per-request limiters.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@providers/aws/recommendations/client.go`:
- Around line 74-100: The shared c.rateLimiter is being mutated across
concurrent sweeps; make the limiter request-scoped by creating a fresh local
limiter instance at the start of the recommendation loop and use that instead of
c.rateLimiter. Concretely, before the for-loop in
GetReservationPurchaseRecommendation create a copy/localLimiter (e.g., via a
Clone/Copy/New method or by constructing a new limiter with the same config),
call
localLimiter.Reset()/localLimiter.Wait()/localLimiter.ShouldRetry()/localLimiter.GetRetryCount()
inside the loop, and remove mutations of c.rateLimiter; then apply the same
change to the Savings Plans path (GetSavingsPlanPurchaseRecommendation) and
ensure callers in GetAllRecommendations continue to fan out safely using
per-request limiters.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f79e1acf-ebc6-4472-adec-8a90d306780f
📒 Files selected for processing (3)
pkg/concurrency/concurrency_test.goproviders/aws/recommendations/client.goproviders/aws/recommendations/parser_sp.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/concurrency/concurrency_test.go
Summary
Follow-up to #269 (
defac6c2a) — when that PR was deployed to dev, thenew fan-out parallelism exposed two operational problems we couldn't
see on the previous serial code path:
Max Memory Used: 512 MB,signal: killed)on the user-triggered refresh — peak in-flight gRPC/HTTP clients
across the now-concurrent provider × account × service|region tree
exceeded the 512 MB cap.
Task timed out after 63.05 seconds) — same root cause; without a global cap, runawayconcurrent IO never settles inside the budget.
Two atomic commits address both:
perf(concurrency)— newpkg/concurrencypackage adds ashared semaphore stashed on
ctx. Every leaf goroutine across AWS /Azure / GCP fan-outs
Acquires one slot before issuing its cloud-APIcall and
Releases after. Aggregate concurrent IO is now hard-boundedat
CUDLY_MAX_PARALLELISM(default 20) regardless of nestingdepth. Intermediates (provider, account, GCP region dispatcher)
don't acquire — only leaves do — so no deadlock by holding-permit-
while-waiting-for-sub-permits.
Acquire/Releaseare no-ops whenno semaphore is on
ctx, so CLI tools and unit tests pass throughunchanged. Mirrors the per-fan-out pattern but adds a single global
bound; per-level caps still help with launch-side bookkeeping.
pkg/concurrency: env-knob parser, no-semaphoreno-ops, cap=3 bounds load test (20 goroutines, peak ≤ 3),
ctx-cancel propagation.mock.Anythingfor the now-wrappedctx(UpsertRecommendations,SendNewRecommendationsNotification).GetRecommendationsextracts agoServiceclosure tokeep the function under the project's 10-branch gocyclo gate
after the per-service
Acquirebranches were added.infra(aws/dev)— bump dev Lambda memory 512 MB → 2 GB interraform/environments/aws/github-dev.tfvars. Temporary headroomscoped to dev — the semaphore is the durable fix, this just gives
the function room to breathe while we observe the new behaviour
live. Staging/prod untouched.
Test plan
go build ./...from repo root — cleango test ./internal/scheduler/...— pass (74 tests, includingthe 3 added in perf: parallelize recommendations collection at every level (closes #266 #267 #268) #269)
go test ./recommendations/...fromproviders/aws/— 176 passgo test .fromproviders/azure/— 104 passgo test .fromproviders/gcp/(recs subset) — 18 passgo test ./concurrency/...frompkg/— 11 pass"Recommendations collection: aggregate parallelism cap = 20"log line on entry, and confirm
Max Memory Used< 2048 MB andDuration < 60s on a full refresh.
🤖 Generated with claude-flow
Summary by CodeRabbit
New Features
Chores