Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 109 additions & 60 deletions internal/commitmentopts/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ const pageSize int32 = 100
// long-standing public availability. The targets never round-trip through
// a purchase so their cost is irrelevant; what matters is that AWS
// actually has offerings to return for them.
//
// probeTargetMemoryDB carries an empirical risk: MemoryDB reserved-node
// coverage has historically skewed to db.r6g.* tiers, and db.t4g.small may
// return an empty offerings list in some regions. If that happens the
// orchestrating Service still persists the run (so we don't re-probe in a
// hot loop) and the frontend silently falls back to hardcoded MemoryDB
// rules. Switching to db.r6g.large would be the safe alternative once a
// human with AWS creds confirms via:
//
// aws memorydb describe-reserved-nodes-offerings \
// --region us-east-1 --node-type db.t4g.small
//
// Tracked in github.com/LeanerCloud/CUDly#61.
const (
probeTargetRDS = "db.t3.micro"
probeTargetElastiCache = "cache.t3.micro"
Expand All @@ -38,6 +51,48 @@ const (
probeTargetEC2 = "t3.micro"
)

// walkPaginated runs the standard paginated-Describe loop shared by every
// per-service prober: call fetchPage, accumulate the rawOffers it yields,
// thread the next-page token, and stop after maxPages iterations even if
// AWS keeps returning a non-empty token. Each prober supplies its own
// fetchPage closure that handles per-API quirks (Marker vs NextToken,
// per-item shape conversion, optional client-side instance-type filter).
//
// The loop is broken out into a single helper so the page cap is exercised
// by one unit test (TestWalkPaginated_StopsAtPageCap) rather than six
// near-duplicate tests, and so a refactor can't silently lose the cap on
// one prober while leaving the others intact.
//
// fetchPage receives the token from the previous page (nil on the first
// call) and returns:
// - the rawOffers extracted from this page
// - the next-page token (nil or "" means "no more pages")
// - any API error
//
// service is the canonical service name; it's used to wrap fetchPage errors
// as "<service>: <err>" so callers see which prober blew up without each
// one having to repeat the wrap.
func walkPaginated(
ctx context.Context,
service string,
fetchPage func(ctx context.Context, token *string) ([]rawOffer, *string, error),
) ([]rawOffer, error) {
var raw []rawOffer
var token *string
for page := 0; page < maxPages; page++ {
offers, next, err := fetchPage(ctx, token)
if err != nil {
return nil, fmt.Errorf("%s: %w", service, err)
}
raw = append(raw, offers...)
if next == nil || aws.ToString(next) == "" {
break
}
token = next
}
return raw, nil
}

// collect dedupes a probe's raw (durationSeconds, rawPayment) pairs, runs
// both normalizers, and builds the Combo slice. Duplicates — a single
// (term, payment) tuple appears once per instance size × AZ × engine
Expand Down Expand Up @@ -107,27 +162,26 @@ func (p *RDSProber) Service() string { return "rds" }
// against db.t3.micro.
func (p *RDSProber) Probe(ctx context.Context, cfg aws.Config) ([]Combo, error) {
client := p.client(cfg)
var raw []rawOffer
var marker *string
for page := 0; page < maxPages; page++ {
raw, err := walkPaginated(ctx, p.Service(), func(ctx context.Context, token *string) ([]rawOffer, *string, error) {
out, err := client.DescribeReservedDBInstancesOfferings(ctx, &rds.DescribeReservedDBInstancesOfferingsInput{
DBInstanceClass: aws.String(probeTargetRDS),
MaxRecords: aws.Int32(pageSize),
Marker: marker,
Marker: token,
})
if err != nil {
return nil, fmt.Errorf("rds: %w", err)
return nil, nil, err
}
offers := make([]rawOffer, 0, len(out.ReservedDBInstancesOfferings))
for _, o := range out.ReservedDBInstancesOfferings {
raw = append(raw, rawOffer{
offers = append(offers, rawOffer{
durationSeconds: int64(aws.ToInt32(o.Duration)),
payment: aws.ToString(o.OfferingType),
})
}
if out.Marker == nil || aws.ToString(out.Marker) == "" {
break
}
marker = out.Marker
return offers, out.Marker, nil
})
if err != nil {
return nil, err
}
return collect(p.Service(), raw), nil
}
Expand Down Expand Up @@ -159,27 +213,26 @@ func (p *ElastiCacheProber) Service() string { return "elasticache" }
// Probe returns the combos for cache.t3.micro.
func (p *ElastiCacheProber) Probe(ctx context.Context, cfg aws.Config) ([]Combo, error) {
client := p.client(cfg)
var raw []rawOffer
var marker *string
for page := 0; page < maxPages; page++ {
raw, err := walkPaginated(ctx, p.Service(), func(ctx context.Context, token *string) ([]rawOffer, *string, error) {
out, err := client.DescribeReservedCacheNodesOfferings(ctx, &elasticache.DescribeReservedCacheNodesOfferingsInput{
CacheNodeType: aws.String(probeTargetElastiCache),
MaxRecords: aws.Int32(pageSize),
Marker: marker,
Marker: token,
})
if err != nil {
return nil, fmt.Errorf("elasticache: %w", err)
return nil, nil, err
}
offers := make([]rawOffer, 0, len(out.ReservedCacheNodesOfferings))
for _, o := range out.ReservedCacheNodesOfferings {
raw = append(raw, rawOffer{
offers = append(offers, rawOffer{
durationSeconds: int64(aws.ToInt32(o.Duration)),
payment: aws.ToString(o.OfferingType),
})
}
if out.Marker == nil || aws.ToString(out.Marker) == "" {
break
}
marker = out.Marker
return offers, out.Marker, nil
})
if err != nil {
return nil, err
}
return collect(p.Service(), raw), nil
}
Expand Down Expand Up @@ -213,29 +266,28 @@ func (p *OpenSearchProber) Service() string { return "opensearch" }
// Probe returns the combos for t3.small.search.
func (p *OpenSearchProber) Probe(ctx context.Context, cfg aws.Config) ([]Combo, error) {
client := p.client(cfg)
var raw []rawOffer
var nextToken *string
for page := 0; page < maxPages; page++ {
raw, err := walkPaginated(ctx, p.Service(), func(ctx context.Context, token *string) ([]rawOffer, *string, error) {
out, err := client.DescribeReservedInstanceOfferings(ctx, &opensearch.DescribeReservedInstanceOfferingsInput{
MaxResults: pageSize,
NextToken: nextToken,
NextToken: token,
})
if err != nil {
return nil, fmt.Errorf("opensearch: %w", err)
return nil, nil, err
}
offers := make([]rawOffer, 0, len(out.ReservedInstanceOfferings))
for _, o := range out.ReservedInstanceOfferings {
if string(o.InstanceType) != probeTargetOpenSearch {
continue
}
raw = append(raw, rawOffer{
offers = append(offers, rawOffer{
durationSeconds: int64(o.Duration),
payment: string(o.PaymentOption),
})
}
if out.NextToken == nil || aws.ToString(out.NextToken) == "" {
break
}
nextToken = out.NextToken
return offers, out.NextToken, nil
})
if err != nil {
return nil, err
}
return collect(p.Service(), raw), nil
}
Expand Down Expand Up @@ -269,29 +321,28 @@ func (p *RedshiftProber) Service() string { return "redshift" }
// Probe returns the combos for dc2.large.
func (p *RedshiftProber) Probe(ctx context.Context, cfg aws.Config) ([]Combo, error) {
client := p.client(cfg)
var raw []rawOffer
var marker *string
for page := 0; page < maxPages; page++ {
raw, err := walkPaginated(ctx, p.Service(), func(ctx context.Context, token *string) ([]rawOffer, *string, error) {
out, err := client.DescribeReservedNodeOfferings(ctx, &redshift.DescribeReservedNodeOfferingsInput{
MaxRecords: aws.Int32(pageSize),
Marker: marker,
Marker: token,
})
if err != nil {
return nil, fmt.Errorf("redshift: %w", err)
return nil, nil, err
}
offers := make([]rawOffer, 0, len(out.ReservedNodeOfferings))
for _, o := range out.ReservedNodeOfferings {
if aws.ToString(o.NodeType) != probeTargetRedshift {
continue
}
raw = append(raw, rawOffer{
offers = append(offers, rawOffer{
durationSeconds: int64(aws.ToInt32(o.Duration)),
payment: aws.ToString(o.OfferingType),
})
}
if out.Marker == nil || aws.ToString(out.Marker) == "" {
break
}
marker = out.Marker
return offers, out.Marker, nil
})
if err != nil {
return nil, err
}
return collect(p.Service(), raw), nil
}
Expand Down Expand Up @@ -323,27 +374,26 @@ func (p *MemoryDBProber) Service() string { return "memorydb" }
// Probe returns the combos for db.t4g.small.
func (p *MemoryDBProber) Probe(ctx context.Context, cfg aws.Config) ([]Combo, error) {
client := p.client(cfg)
var raw []rawOffer
var nextToken *string
for page := 0; page < maxPages; page++ {
raw, err := walkPaginated(ctx, p.Service(), func(ctx context.Context, token *string) ([]rawOffer, *string, error) {
out, err := client.DescribeReservedNodesOfferings(ctx, &memorydb.DescribeReservedNodesOfferingsInput{
NodeType: aws.String(probeTargetMemoryDB),
MaxResults: aws.Int32(pageSize),
NextToken: nextToken,
NextToken: token,
})
if err != nil {
return nil, fmt.Errorf("memorydb: %w", err)
return nil, nil, err
}
offers := make([]rawOffer, 0, len(out.ReservedNodesOfferings))
for _, o := range out.ReservedNodesOfferings {
raw = append(raw, rawOffer{
offers = append(offers, rawOffer{
durationSeconds: int64(o.Duration),
payment: aws.ToString(o.OfferingType),
})
}
if out.NextToken == nil || aws.ToString(out.NextToken) == "" {
break
}
nextToken = out.NextToken
return offers, out.NextToken, nil
})
if err != nil {
return nil, err
}
return collect(p.Service(), raw), nil
}
Expand Down Expand Up @@ -378,28 +428,27 @@ func (p *EC2Prober) Service() string { return "ec2" }
// normalization.
func (p *EC2Prober) Probe(ctx context.Context, cfg aws.Config) ([]Combo, error) {
client := p.client(cfg)
var raw []rawOffer
var nextToken *string
for page := 0; page < maxPages; page++ {
raw, err := walkPaginated(ctx, p.Service(), func(ctx context.Context, token *string) ([]rawOffer, *string, error) {
out, err := client.DescribeReservedInstancesOfferings(ctx, &ec2.DescribeReservedInstancesOfferingsInput{
InstanceType: ec2types.InstanceType(probeTargetEC2),
IncludeMarketplace: aws.Bool(false),
MaxResults: aws.Int32(pageSize),
NextToken: nextToken,
NextToken: token,
})
if err != nil {
return nil, fmt.Errorf("ec2: %w", err)
return nil, nil, err
}
offers := make([]rawOffer, 0, len(out.ReservedInstancesOfferings))
for _, o := range out.ReservedInstancesOfferings {
raw = append(raw, rawOffer{
offers = append(offers, rawOffer{
durationSeconds: aws.ToInt64(o.Duration),
payment: string(o.OfferingType),
})
}
if out.NextToken == nil || aws.ToString(out.NextToken) == "" {
break
}
nextToken = out.NextToken
return offers, out.NextToken, nil
})
if err != nil {
return nil, err
}
return collect(p.Service(), raw), nil
}
Expand Down
Loading