Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions pkg/parser/schedule_fuzzy_scatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ func stableHash(s string, modulo int) int {
// ScatterSchedule takes a fuzzy cron expression and a workflow identifier
// and returns a deterministic scattered time for that workflow
func ScatterSchedule(fuzzyCron, workflowIdentifier string) (string, error) {
scheduleFuzzyScatterLog.Printf("Scattering schedule: fuzzyCron=%s, workflowId=%s", fuzzyCron, workflowIdentifier)
if !IsFuzzyCron(fuzzyCron) {
scheduleFuzzyScatterLog.Printf("Invalid fuzzy cron expression: %s", fuzzyCron)
return "", fmt.Errorf("not a fuzzy schedule: %s", fuzzyCron)
}

Expand Down Expand Up @@ -85,8 +87,10 @@ func ScatterSchedule(fuzzyCron, workflowIdentifier string) (string, error) {
hour := scatteredMinutes / 60
minute := scatteredMinutes % 60

result := fmt.Sprintf("%d %d * * *", minute, hour)
scheduleFuzzyScatterLog.Printf("FUZZY:DAILY_AROUND scattered: original=%d:%d, scattered=%d:%d, result=%s", targetHour, targetMinute, hour, minute, result)
// Return scattered daily cron: minute hour * * *
return fmt.Sprintf("%d %d * * *", minute, hour), nil
return result, nil
}

// For FUZZY:DAILY_BETWEEN:START_H:START_M:END_H:END_M * * *, scatter within the time range
Expand Down Expand Up @@ -152,8 +156,10 @@ func ScatterSchedule(fuzzyCron, workflowIdentifier string) (string, error) {
hour := scatteredMinutes / 60
minute := scatteredMinutes % 60

result := fmt.Sprintf("%d %d * * *", minute, hour)
scheduleFuzzyScatterLog.Printf("FUZZY:DAILY_BETWEEN scattered: start=%d:%d, end=%d:%d, scattered=%d:%d, result=%s", startHour, startMinute, endHour, endMinute, hour, minute, result)
// Return scattered daily cron: minute hour * * *
return fmt.Sprintf("%d %d * * *", minute, hour), nil
return result, nil
}

// For FUZZY:DAILY * * *, we scatter across 24 hours
Expand All @@ -164,8 +170,10 @@ func ScatterSchedule(fuzzyCron, workflowIdentifier string) (string, error) {
hour := hash / 60
minute := hash % 60

result := fmt.Sprintf("%d %d * * *", minute, hour)
scheduleFuzzyScatterLog.Printf("FUZZY:DAILY scattered: hash=%d, result=%s", hash, result)
// Return scattered daily cron: minute hour * * *
return fmt.Sprintf("%d %d * * *", minute, hour), nil
return result, nil
}

// For FUZZY:HOURLY/N * * *, we scatter the minute offset within the hour
Expand All @@ -186,8 +194,10 @@ func ScatterSchedule(fuzzyCron, workflowIdentifier string) (string, error) {
// Use a stable hash to get a deterministic minute offset (0-59)
minute := stableHash(workflowIdentifier, 60)

result := fmt.Sprintf("%d */%d * * *", minute, interval)
scheduleFuzzyScatterLog.Printf("FUZZY:HOURLY/%d scattered: minute=%d, result=%s", interval, minute, result)
// Return scattered hourly cron: minute */N * * *
return fmt.Sprintf("%d */%d * * *", minute, interval), nil
return result, nil
}

// For FUZZY:WEEKLY_AROUND:DOW:HH:MM * * *, scatter around the target time on specific weekday
Expand Down Expand Up @@ -242,8 +252,10 @@ func ScatterSchedule(fuzzyCron, workflowIdentifier string) (string, error) {
hour := scatteredMinutes / 60
minute := scatteredMinutes % 60

result := fmt.Sprintf("%d %d * * %s", minute, hour, weekday)
scheduleFuzzyScatterLog.Printf("FUZZY:WEEKLY_AROUND scattered: weekday=%s, target=%d:%d, scattered=%d:%d, result=%s", weekday, targetHour, targetMinute, hour, minute, result)
// Return scattered weekly cron: minute hour * * DOW
return fmt.Sprintf("%d %d * * %s", minute, hour, weekday), nil
return result, nil
}

// For FUZZY:WEEKLY:DOW * * *, we scatter time on specific weekday
Expand All @@ -263,8 +275,10 @@ func ScatterSchedule(fuzzyCron, workflowIdentifier string) (string, error) {
hour := hash / 60
minute := hash % 60

result := fmt.Sprintf("%d %d * * %s", minute, hour, weekday)
scheduleFuzzyScatterLog.Printf("FUZZY:WEEKLY:%s scattered: hash=%d, result=%s", weekday, hash, result)
// Return scattered weekly cron: minute hour * * DOW
return fmt.Sprintf("%d %d * * %s", minute, hour, weekday), nil
return result, nil
}

// For FUZZY:WEEKLY * * *, we scatter across all weekdays and times
Expand All @@ -279,8 +293,10 @@ func ScatterSchedule(fuzzyCron, workflowIdentifier string) (string, error) {
hour := minutesInDay / 60
minute := minutesInDay % 60

result := fmt.Sprintf("%d %d * * %d", minute, hour, weekday)
scheduleFuzzyScatterLog.Printf("FUZZY:WEEKLY scattered: weekday=%d, time=%d:%d, result=%s", weekday, hour, minute, result)
// Return scattered weekly cron: minute hour * * DOW
return fmt.Sprintf("%d %d * * %d", minute, hour, weekday), nil
return result, nil
}

// For FUZZY:BI_WEEKLY * * *, we scatter across 2 weeks (14 days)
Expand All @@ -294,9 +310,11 @@ func ScatterSchedule(fuzzyCron, workflowIdentifier string) (string, error) {
hour := minutesInDay / 60
minute := minutesInDay % 60

result := fmt.Sprintf("%d %d */%d * *", minute, hour, 14)
scheduleFuzzyScatterLog.Printf("FUZZY:BI_WEEKLY scattered: time=%d:%d, result=%s", hour, minute, result)
// Convert to cron: We use day-of-month pattern with 14-day interval
// Schedule every 14 days at the scattered time
return fmt.Sprintf("%d %d */%d * *", minute, hour, 14), nil
return result, nil
}

// For FUZZY:TRI_WEEKLY * * *, we scatter across 3 weeks (21 days)
Expand All @@ -310,10 +328,13 @@ func ScatterSchedule(fuzzyCron, workflowIdentifier string) (string, error) {
hour := minutesInDay / 60
minute := minutesInDay % 60

result := fmt.Sprintf("%d %d */%d * *", minute, hour, 21)
scheduleFuzzyScatterLog.Printf("FUZZY:TRI_WEEKLY scattered: time=%d:%d, result=%s", hour, minute, result)
// Convert to cron: We use day-of-month pattern with 21-day interval
// Schedule every 21 days at the scattered time
return fmt.Sprintf("%d %d */%d * *", minute, hour, 21), nil
return result, nil
}

scheduleFuzzyScatterLog.Printf("Unsupported fuzzy schedule type: %s", fuzzyCron)
return "", fmt.Errorf("unsupported fuzzy schedule type: %s", fuzzyCron)
}
22 changes: 21 additions & 1 deletion pkg/workflow/runtime_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ var runtimeValidationLog = logger.New("workflow:runtime_validation")

// validateExpressionSizes validates that no expression values in the generated YAML exceed GitHub Actions limits
func (c *Compiler) validateExpressionSizes(yamlContent string) error {
runtimeValidationLog.Print("Validating expression sizes in generated YAML")
lines := strings.Split(yamlContent, "\n")
runtimeValidationLog.Printf("Validating expression sizes: yaml_lines=%d, max_size=%d", len(lines), MaxExpressionSize)
maxSize := MaxExpressionSize

for lineNum, line := range lines {
Expand Down Expand Up @@ -97,6 +97,7 @@ func (c *Compiler) validateExpressionSizes(yamlContent string) error {
// validateContainerImages validates that container images specified in MCP configs exist and are accessible
func (c *Compiler) validateContainerImages(workflowData *WorkflowData) error {
if workflowData.Tools == nil {
runtimeValidationLog.Print("No tools configured, skipping container validation")
return nil
}

Expand Down Expand Up @@ -145,6 +146,7 @@ func (c *Compiler) validateContainerImages(workflowData *WorkflowData) error {
)
}

runtimeValidationLog.Print("Container image validation passed")
return nil
}

Expand All @@ -159,23 +161,30 @@ func (c *Compiler) validateRuntimePackages(workflowData *WorkflowData) error {
switch req.Runtime.ID {
case "node":
// Validate npx packages used in the workflow
runtimeValidationLog.Print("Validating npx packages")
if err := c.validateNpxPackages(workflowData); err != nil {
runtimeValidationLog.Printf("Npx package validation failed: %v", err)
errors = append(errors, err.Error())
}
case "python":
// Validate pip packages used in the workflow
runtimeValidationLog.Print("Validating pip packages")
if err := c.validatePipPackages(workflowData); err != nil {
runtimeValidationLog.Printf("Pip package validation failed: %v", err)
errors = append(errors, err.Error())
}
case "uv":
// Validate uv packages used in the workflow
runtimeValidationLog.Print("Validating uv packages")
if err := c.validateUvPackages(workflowData); err != nil {
runtimeValidationLog.Printf("Uv package validation failed: %v", err)
errors = append(errors, err.Error())
}
}
}

if len(errors) > 0 {
runtimeValidationLog.Printf("Runtime package validation completed with %d errors", len(errors))
return NewValidationError(
"runtime.packages",
fmt.Sprintf("%d package validation errors", len(errors)),
Expand All @@ -184,6 +193,7 @@ func (c *Compiler) validateRuntimePackages(workflowData *WorkflowData) error {
)
}

runtimeValidationLog.Print("Runtime package validation passed")
return nil
}

Expand All @@ -195,6 +205,7 @@ func collectPackagesFromWorkflow(
extractor func(string) []string,
toolCommand string,
) []string {
runtimeValidationLog.Printf("Collecting packages from workflow: toolCommand=%s", toolCommand)
var packages []string
seen := make(map[string]bool)

Expand Down Expand Up @@ -264,14 +275,17 @@ func collectPackagesFromWorkflow(
}
}

runtimeValidationLog.Printf("Collected %d unique packages", len(packages))
return packages
}

// validateNoDuplicateCacheIDs checks for duplicate cache IDs and returns an error if found
func validateNoDuplicateCacheIDs(caches []CacheMemoryEntry) error {
runtimeValidationLog.Printf("Validating cache IDs: checking %d caches for duplicates", len(caches))
seen := make(map[string]bool)
for _, cache := range caches {
if seen[cache.ID] {
runtimeValidationLog.Printf("Duplicate cache ID found: %s", cache.ID)
return NewValidationError(
"sandbox.cache-memory",
cache.ID,
Expand All @@ -281,16 +295,19 @@ func validateNoDuplicateCacheIDs(caches []CacheMemoryEntry) error {
}
seen[cache.ID] = true
}
runtimeValidationLog.Print("Cache ID validation passed: no duplicates found")
return nil
}

// validateSecretReferences validates that secret references are valid
func validateSecretReferences(secrets []string) error {
runtimeValidationLog.Printf("Validating secret references: checking %d secrets", len(secrets))
// Secret names must be valid environment variable names
secretNamePattern := regexp.MustCompile(`^[A-Z][A-Z0-9_]*$`)

for _, secret := range secrets {
if !secretNamePattern.MatchString(secret) {
runtimeValidationLog.Printf("Invalid secret name format: %s", secret)
return NewValidationError(
"secrets",
secret,
Expand All @@ -310,11 +327,14 @@ func (c *Compiler) validateFirewallConfig(workflowData *WorkflowData) error {
}

config := workflowData.NetworkPermissions.Firewall
runtimeValidationLog.Printf("Validating firewall config: enabled=%v, logLevel=%s", config.Enabled, config.LogLevel)
if config.LogLevel != "" {
if err := ValidateLogLevel(config.LogLevel); err != nil {
runtimeValidationLog.Printf("Invalid firewall log level: %s", config.LogLevel)
return err
}
}

runtimeValidationLog.Print("Firewall config validation passed")
return nil
}