Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 106 additions & 27 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,33 +168,23 @@ func (s *Scheduler) CollectRecommendations(ctx context.Context) (*CollectResult,
return nil, err
}

// Collect recommendations from each enabled provider, tracking per-provider
// outcomes so persistence can scope stale-row eviction to (provider, account)
// pairs that actually ran. A partial collection (e.g. one of three Azure
// subscriptions failed) preserves the failed pairs' previous-cycle rows
// instead of wiping the whole provider's slice.
var allRecommendations []config.RecommendationRecord
var totalSavings float64
var successfulProviders []string
var successfulCollects []config.SuccessfulCollect
failedProviders := map[string]string{}

for _, providerName := range globalCfg.EnabledProviders {
logging.Infof("Collecting recommendations from %s...", providerName)

recs, succeededAccountIDs, err := s.collectProviderRecommendations(ctx, providerName, globalCfg)
if err != nil {
logging.Errorf("Failed to collect %s recommendations: %v", providerName, err)
failedProviders[providerName] = err.Error()
continue
}
successfulProviders = append(successfulProviders, providerName)
successfulCollects = append(successfulCollects, expandSuccessfulCollects(providerName, succeededAccountIDs)...)

for _, rec := range recs {
totalSavings += rec.Savings
}
allRecommendations = append(allRecommendations, recs...)
// Collect recommendations from each enabled provider concurrently, tracking
// per-provider outcomes so persistence can scope stale-row eviction to
// (provider, account) pairs that actually ran. A partial collection (e.g.
// one of three Azure subscriptions failed) preserves the failed pairs'
// previous-cycle rows instead of wiping the whole provider's slice.
//
// Provider-level fan-out under errgroup. Each goroutine returns nil to the
// group so a single provider's failure does not cancel siblings — matches
// the previous loop's `continue`-on-error behaviour. Per-provider results
// are written into a map under a single mutex; the merge then walks
// EnabledProviders in config order so successfulProviders ordering is
// deterministic regardless of goroutine completion order. After Wait, ctx
// cancellation is propagated. No concurrency cap — the universe is at
// most 3 providers.
allRecommendations, totalSavings, successfulProviders, successfulCollects, failedProviders, err := s.collectAllProviders(ctx, globalCfg)
if err != nil {
return nil, err
}

logging.Infof("Collected %d recommendations with $%.2f/month potential savings",
Expand Down Expand Up @@ -254,6 +244,95 @@ func (s *Scheduler) CollectRecommendations(ctx context.Context) (*CollectResult,
// On any failure (including partial), the collection error is additionally
// recorded in recommendations_state so the frontend banner renders while
// the user still sees the valid rows we managed to upsert.
// providerOutcome bundles a single provider's collection outcome for the
// deterministic merge in collectAllProviders. Only one of recs/err is
// meaningful per outcome — err != nil means the provider failed entirely
// (mirrors the pre-fan-out `continue` branch); err == nil means the provider
// succeeded (possibly partially — partial-account-failure semantics live in
// fanOutPerAccount, not here).
type providerOutcome struct {
recs []config.RecommendationRecord
succeededAccountIDs []string
err error
}

// collectAllProviders fans out provider collection (AWS / Azure / GCP) under
// errgroup. Each goroutine returns nil so a single provider's error does not
// cancel siblings; per-provider results are written into a map under a single
// mutex. After Wait, ctx cancellation is propagated. The merge then walks
// EnabledProviders in config order so successfulProviders / successfulCollects
// / allRecommendations ordering is deterministic regardless of goroutine
// completion order — keeps existing tests stable. No concurrency cap; the
// universe is at most 3 providers.
//
// Extracted from CollectRecommendations to keep that function under the
// project's gocyclo gate (.golangci.yml min-complexity: 15) after the
// errgroup + post-Wait ctx.Err() block was added.
func (s *Scheduler) collectAllProviders(ctx context.Context, globalCfg *config.GlobalConfig) (
allRecommendations []config.RecommendationRecord,
totalSavings float64,
successfulProviders []string,
successfulCollects []config.SuccessfulCollect,
failedProviders map[string]string,
err error,
) {
failedProviders = map[string]string{}

var (
mu sync.Mutex
outcomes = make(map[string]providerOutcome, len(globalCfg.EnabledProviders))
)

g, gctx := errgroup.WithContext(ctx)

for _, providerName := range globalCfg.EnabledProviders {
providerName := providerName // capture per-iteration
g.Go(func() error {
logging.Infof("Collecting recommendations from %s...", providerName)
recs, succeededAccountIDs, perr := s.collectProviderRecommendations(gctx, providerName, globalCfg)
mu.Lock()
outcomes[providerName] = providerOutcome{
recs: recs,
succeededAccountIDs: succeededAccountIDs,
err: perr,
}
mu.Unlock()
return nil // error isolation: per-provider failures don't cancel siblings
})
}

// Wait for all goroutines. g.Wait() always returns nil because every
// goroutine returns nil. After Wait, propagate ctx cancellation so
// callers can distinguish "all providers completed" from "the parent
// ctx was canceled mid-fan-out".
_ = g.Wait()
if cerr := ctx.Err(); cerr != nil {
return nil, 0, nil, nil, nil, cerr
}

// Deterministic merge: walk EnabledProviders in config order so
// successfulProviders ordering is independent of goroutine completion
// order — keeps existing tests stable.
for _, providerName := range globalCfg.EnabledProviders {
out, ok := outcomes[providerName]
if !ok {
continue
}
if out.err != nil {
logging.Errorf("Failed to collect %s recommendations: %v", providerName, out.err)
failedProviders[providerName] = out.err.Error()
continue
}
successfulProviders = append(successfulProviders, providerName)
successfulCollects = append(successfulCollects, expandSuccessfulCollects(providerName, out.succeededAccountIDs)...)
for _, rec := range out.recs {
totalSavings += rec.Savings
}
allRecommendations = append(allRecommendations, out.recs...)
}
return allRecommendations, totalSavings, successfulProviders, successfulCollects, failedProviders, nil
}

// clearCollectionStartedBestEffort clears last_collection_started_at on the
// scheduler's exit path. Best-effort — a failure here is logged but does not
// prevent returning the collection result. Extracted so CollectRecommendations
Expand Down
143 changes: 126 additions & 17 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func TestScheduler_CollectRecommendations_AWSProvider(t *testing.T) {

mockStore.On("GetGlobalConfig", ctx).Return(globalCfg, nil)
// Mock provider factory to return error (simulating no credentials)
mockFactory.On("CreateAndValidateProvider", ctx, mock.Anything, mock.Anything).
mockFactory.On("CreateAndValidateProvider", mock.Anything, mock.Anything, mock.Anything).
Return(nil, assert.AnError)

scheduler := &Scheduler{
Expand Down Expand Up @@ -591,7 +591,7 @@ func TestScheduler_CollectRecommendations_AllProviders(t *testing.T) {

mockStore.On("GetGlobalConfig", ctx).Return(globalCfg, nil)
// Mock provider factory to return error for all providers (simulating no credentials)
mockFactory.On("CreateAndValidateProvider", ctx, mock.Anything, mock.Anything).
mockFactory.On("CreateAndValidateProvider", mock.Anything, mock.Anything, mock.Anything).
Return(nil, assert.AnError)

scheduler := &Scheduler{
Expand All @@ -607,6 +607,110 @@ func TestScheduler_CollectRecommendations_AllProviders(t *testing.T) {
assert.Equal(t, 0, result.Recommendations)
}

// TestScheduler_CollectRecommendations_ParallelProviders pins three contracts
// of the provider-level fan-out introduced in collectAllProviders (closes
// #268):
//
// 1. successfulProviders ordering matches EnabledProviders config order, NOT
// goroutine completion order. We assert this with a deliberately non-
// alphabetical config order ["gcp", "aws", "azure"] so the result is
// distinguishable from any of: input order, alphabetical, or arbitrary
// map iteration.
// 2. A single provider's error does not cancel siblings — when the mock
// factory returns an error for "azure" and successes for "aws"+"gcp",
// the result still includes the successful providers and reports
// azure in failedProviders.
// 3. ctx cancellation propagates: a pre-cancelled ctx surfaces as
// context.Canceled (not a "successful but empty" CollectResult).
func TestScheduler_CollectRecommendations_ParallelProviders(t *testing.T) {
t.Run("successfulProviders ordering matches config order, not goroutine completion", func(t *testing.T) {
ctx := context.Background()
mockStore := new(MockConfigStore)
mockEmail := new(MockEmailSender)
mockFactory := new(MockProviderFactory)

// Deliberately non-alphabetical config order so we can distinguish
// "by config" from "by goroutine completion" from "alphabetical"
// from "by map-iteration order".
//
// MockConfigStore.ListCloudAccounts is a hardcoded stub that
// returns (nil, nil) — so Azure and GCP both hit their
// "no enabled accounts → return nil, nil, nil" early-return path
// (succeed with zero recommendations). AWS falls through to the
// ambient-credential collectAWSAmbient path which calls the
// provider factory; the mock returns assert.AnError so AWS lands
// in failedProviders.
//
// Expected merge under the deterministic-config-order walk:
// successfulProviders = ["gcp", "azure"] (skipping the failed
// "aws" entry, in
// config order)
// failedProviders = {"aws": "..."}
//
// If the merge instead walked the outcomes map (random Go map
// iteration), or sorted alphabetically, or used goroutine
// completion order, we'd see ["azure", "gcp"] or random ordering
// — distinguishable from the expected config-ordered result.
enabled := []string{"gcp", "aws", "azure"}
globalCfg := &config.GlobalConfig{
EnabledProviders: enabled,
DefaultTerm: 3,
DefaultPayment: "all-upfront",
}

mockStore.On("GetGlobalConfig", ctx).Return(globalCfg, nil)
mockFactory.On("CreateAndValidateProvider", mock.Anything, mock.Anything, mock.Anything).
Return(nil, assert.AnError)

scheduler := &Scheduler{
config: mockStore,
email: mockEmail,
dashboardURL: "https://dashboard.example.com",
providerFactory: mockFactory,
}

result, err := scheduler.CollectRecommendations(ctx)
require.NoError(t, err)

assert.Equal(t, []string{"gcp", "azure"}, result.SuccessfulProviders,
"successfulProviders must be in EnabledProviders config order, "+
"skipping the failed AWS entry; non-alphabetical config "+
"distinguishes config-order from sort-order")
assert.Len(t, result.FailedProviders, 1)
assert.Contains(t, result.FailedProviders, "aws")
})

t.Run("ctx cancellation propagates", func(t *testing.T) {
mockStore := new(MockConfigStore)
mockEmail := new(MockEmailSender)
mockFactory := new(MockProviderFactory)

globalCfg := &config.GlobalConfig{
EnabledProviders: []string{"aws", "azure", "gcp"},
}
// GetGlobalConfig is called pre-fan-out. We need it to succeed so
// we reach the fan-out, where the cancelled ctx is observed.
mockStore.On("GetGlobalConfig", mock.Anything).Return(globalCfg, nil)
mockFactory.On("CreateAndValidateProvider", mock.Anything, mock.Anything, mock.Anything).
Return(nil, assert.AnError)

scheduler := &Scheduler{
config: mockStore,
email: mockEmail,
dashboardURL: "https://dashboard.example.com",
providerFactory: mockFactory,
}

ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel BEFORE the call

_, err := scheduler.CollectRecommendations(ctx)
require.Error(t, err, "expected context.Canceled to propagate from CollectRecommendations")
assert.ErrorIs(t, err, context.Canceled,
"CollectRecommendations must propagate the parent ctx error after the provider fan-out's g.Wait()")
})
}

func TestScheduler_CollectRecommendations_UnknownProvider(t *testing.T) {
ctx := context.Background()
mockStore := new(MockConfigStore)
Expand Down Expand Up @@ -643,7 +747,7 @@ func TestScheduler_CollectAWSRecommendations(t *testing.T) {
}

// Mock provider factory to return error (simulating no credentials)
mockFactory.On("CreateAndValidateProvider", ctx, "aws", mock.Anything).
mockFactory.On("CreateAndValidateProvider", mock.Anything, "aws", mock.Anything).
Return(nil, assert.AnError)

scheduler := &Scheduler{
Expand All @@ -665,7 +769,7 @@ func TestScheduler_CollectAzureRecommendations_NoAccounts(t *testing.T) {
DefaultPayment: "all-upfront",
}

mockStore.On("ListCloudAccounts", ctx, mock.Anything).Return([]config.CloudAccount{}, nil)
mockStore.On("ListCloudAccounts", mock.Anything, mock.Anything).Return([]config.CloudAccount{}, nil)

scheduler := &Scheduler{config: mockStore}

Expand All @@ -683,7 +787,7 @@ func TestScheduler_CollectGCPRecommendations_NoAccounts_Alt(t *testing.T) {
DefaultPayment: "all-upfront",
}

mockStore.On("ListCloudAccounts", ctx, mock.Anything).Return([]config.CloudAccount{}, nil)
mockStore.On("ListCloudAccounts", mock.Anything, mock.Anything).Return([]config.CloudAccount{}, nil)

scheduler := &Scheduler{config: mockStore}

Expand All @@ -703,10 +807,10 @@ func TestScheduler_CollectProviderRecommendations(t *testing.T) {
}

// AWS ambient fallback: factory returns error
mockFactory.On("CreateAndValidateProvider", ctx, mock.Anything, mock.Anything).
mockFactory.On("CreateAndValidateProvider", mock.Anything, mock.Anything, mock.Anything).
Return(nil, assert.AnError)
// Azure/GCP: no accounts → skip gracefully
mockStore.On("ListCloudAccounts", ctx, mock.Anything).Return([]config.CloudAccount{}, nil)
mockStore.On("ListCloudAccounts", mock.Anything, mock.Anything).Return([]config.CloudAccount{}, nil)

scheduler := &Scheduler{
config: mockStore,
Expand Down Expand Up @@ -753,7 +857,7 @@ func TestScheduler_CollectRecommendations_WithNotification(t *testing.T) {

mockStore.On("GetGlobalConfig", ctx).Return(globalCfg, nil)
// Mock provider factory to return error (simulating no credentials)
mockFactory.On("CreateAndValidateProvider", ctx, mock.Anything, mock.Anything).
mockFactory.On("CreateAndValidateProvider", mock.Anything, mock.Anything, mock.Anything).
Return(nil, assert.AnError)
// No expectation for SendNewRecommendationsNotification because
// there are no recommendations
Expand Down Expand Up @@ -1455,7 +1559,7 @@ func TestScheduler_CollectAWSRecommendations_Success(t *testing.T) {
},
}

mockFactory.On("CreateAndValidateProvider", ctx, "aws", mock.Anything).Return(mockProvider, nil)
mockFactory.On("CreateAndValidateProvider", mock.Anything, "aws", mock.Anything).Return(mockProvider, nil)
mockProvider.On("GetRecommendationsClient", ctx).Return(mockRecClient, nil)
mockRecClient.On("GetAllRecommendations", ctx).Return(recommendations, nil)

Expand All @@ -1482,7 +1586,7 @@ func TestScheduler_CollectAWSRecommendations_RecClientError(t *testing.T) {
DefaultPayment: "all-upfront",
}

mockFactory.On("CreateAndValidateProvider", ctx, "aws", mock.Anything).Return(mockProvider, nil)
mockFactory.On("CreateAndValidateProvider", mock.Anything, "aws", mock.Anything).Return(mockProvider, nil)
mockProvider.On("GetRecommendationsClient", ctx).Return(nil, assert.AnError)

scheduler := &Scheduler{
Expand All @@ -1508,7 +1612,7 @@ func TestScheduler_CollectAWSRecommendations_GetRecsError(t *testing.T) {
DefaultPayment: "all-upfront",
}

mockFactory.On("CreateAndValidateProvider", ctx, "aws", mock.Anything).Return(mockProvider, nil)
mockFactory.On("CreateAndValidateProvider", mock.Anything, "aws", mock.Anything).Return(mockProvider, nil)
mockProvider.On("GetRecommendationsClient", ctx).Return(mockRecClient, nil)
mockRecClient.On("GetAllRecommendations", ctx).Return(nil, assert.AnError)

Expand Down Expand Up @@ -1542,7 +1646,7 @@ func TestScheduler_CollectAzureRecommendations_Success(t *testing.T) {
Enabled: true,
},
}
mockStore.On("ListCloudAccounts", ctx, mock.Anything).Return(azureAccounts, nil)
mockStore.On("ListCloudAccounts", mock.Anything, mock.Anything).Return(azureAccounts, nil)

// The managed_identity path will try DefaultAzureCredential which will
// fail in tests, so we expect an error log but no crash.
Expand All @@ -1567,7 +1671,7 @@ func TestScheduler_CollectGCPRecommendations_NoAccounts(t *testing.T) {
DefaultPayment: "all-upfront",
}

mockStore.On("ListCloudAccounts", ctx, mock.Anything).Return([]config.CloudAccount{}, nil)
mockStore.On("ListCloudAccounts", mock.Anything, mock.Anything).Return([]config.CloudAccount{}, nil)

scheduler := &Scheduler{
config: mockStore,
Expand Down Expand Up @@ -1606,9 +1710,14 @@ func TestScheduler_CollectRecommendations_WithSuccessfulRecs(t *testing.T) {
}

mockStore.On("GetGlobalConfig", ctx).Return(globalCfg, nil)
mockFactory.On("CreateAndValidateProvider", ctx, "aws", mock.Anything).Return(mockProvider, nil)
mockProvider.On("GetRecommendationsClient", ctx).Return(mockRecClient, nil)
mockRecClient.On("GetAllRecommendations", ctx).Return(recommendations, nil)
mockFactory.On("CreateAndValidateProvider", mock.Anything, "aws", mock.Anything).Return(mockProvider, nil)
// Provider/RecClient are invoked from inside the per-provider errgroup
// goroutine in collectAllProviders, so they receive the errgroup-derived
// gctx rather than the caller's ctx. mock.Anything keeps the assertion
// resilient to that wrap (the post-Wait ctx.Err() check + cancellation
// contract test cover the cancellation path).
mockProvider.On("GetRecommendationsClient", mock.Anything).Return(mockRecClient, nil)
mockRecClient.On("GetAllRecommendations", mock.Anything).Return(recommendations, nil)
mockEmail.On("SendNewRecommendationsNotification", ctx, mock.AnythingOfType("email.NotificationData")).Return(nil)

scheduler := &Scheduler{
Expand Down Expand Up @@ -1652,7 +1761,7 @@ func TestScheduler_CollectAWSRecommendations_FallbackToFiltered(t *testing.T) {
},
}

mockFactory.On("CreateAndValidateProvider", ctx, "aws", mock.Anything).Return(mockProvider, nil)
mockFactory.On("CreateAndValidateProvider", mock.Anything, "aws", mock.Anything).Return(mockProvider, nil)
mockProvider.On("GetRecommendationsClient", ctx).Return(mockRecClient, nil)
mockRecClient.On("GetAllRecommendations", ctx).Return([]common.Recommendation{}, nil) // Empty
mockRecClient.On("GetRecommendations", ctx, mock.AnythingOfType("common.RecommendationParams")).Return(filteredRecommendations, nil)
Expand Down
Loading