From 0504c06f7df318807127f80cfe59835ab4a30a17 Mon Sep 17 00:00:00 2001 From: Caleb Gross Date: Sat, 21 Mar 2026 11:53:05 -0400 Subject: [PATCH] feat: make reactor cooldowns and startup delays configurable Add reactor.cooldowns config section for per-chain cooldown overrides, consolidation startup delay, and orchestrator health report interval. Reactor cooldowns use a map[string]string keyed by chain ID, parsed to time.Duration at startup. Unset chains keep their built-in defaults. Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/mnemonic/main.go | 33 ++++++++++++++----- internal/agent/consolidation/agent.go | 10 ++++-- internal/agent/orchestrator/orchestrator.go | 19 +++++++---- internal/agent/reactor/registry.go | 31 ++++++++++++------ internal/config/config.go | 36 +++++++++++++++------ 5 files changed, 92 insertions(+), 37 deletions(-) diff --git a/cmd/mnemonic/main.go b/cmd/mnemonic/main.go index b3c8d4d3..bad38838 100644 --- a/cmd/mnemonic/main.go +++ b/cmd/mnemonic/main.go @@ -1621,12 +1621,13 @@ func serveCommand(configPath string) { var orch *orchestrator.Orchestrator if cfg.Orchestrator.Enabled { orch = orchestrator.NewOrchestrator(memStore, wrap("orchestrator"), orchestrator.OrchestratorConfig{ - AdaptiveIntervals: cfg.Orchestrator.AdaptiveIntervals, - MaxDBSizeMB: cfg.Orchestrator.MaxDBSizeMB, - SelfTestInterval: cfg.Orchestrator.SelfTestInterval, - AutoRecovery: cfg.Orchestrator.AutoRecovery, - HealthReportPath: filepath.Join(filepath.Dir(cfg.Store.DBPath), "health.json"), - MonitorInterval: cfg.Orchestrator.MonitorInterval, + AdaptiveIntervals: cfg.Orchestrator.AdaptiveIntervals, + MaxDBSizeMB: cfg.Orchestrator.MaxDBSizeMB, + SelfTestInterval: cfg.Orchestrator.SelfTestInterval, + AutoRecovery: cfg.Orchestrator.AutoRecovery, + HealthReportPath: filepath.Join(filepath.Dir(cfg.Store.DBPath), "health.json"), + MonitorInterval: cfg.Orchestrator.MonitorInterval, + HealthReportInterval: cfg.Orchestrator.HealthReportInterval, }, log) if err := orch.Start(rootCtx, bus); err != nil { @@ -1643,9 +1644,24 @@ func serveCommand(configPath string) { reactorLog := log.With("component", "reactor") reactorEngine := reactor.NewEngine(memStore, bus, reactorLog) + // Parse reactor cooldown overrides from config + var cooldownOverrides map[string]time.Duration + if len(cfg.Reactor.Cooldowns) > 0 { + cooldownOverrides = make(map[string]time.Duration, len(cfg.Reactor.Cooldowns)) + for chainID, durStr := range cfg.Reactor.Cooldowns { + d, err := time.ParseDuration(durStr) + if err != nil { + log.Warn("invalid reactor cooldown duration, ignoring", "chain_id", chainID, "value", durStr, "error", err) + continue + } + cooldownOverrides[chainID] = d + } + } + deps := reactor.ChainDeps{ - MaxDBSizeMB: cfg.Orchestrator.MaxDBSizeMB, - Logger: reactorLog, + MaxDBSizeMB: cfg.Orchestrator.MaxDBSizeMB, + CooldownOverrides: cooldownOverrides, + Logger: reactorLog, } if consolidator != nil { deps.ConsolidationTrigger = consolidator.GetTriggerChannel() @@ -1913,6 +1929,7 @@ func toConsolidationConfig(cfg *config.Config) consolidation.ConsolidationConfig SelfSustainingMinStrength: float32(cfg.Consolidation.SelfSustainingMinStrength), SelfSustainingDecay: float32(cfg.Consolidation.SelfSustainingDecay), NeverRecalledArchiveDays: cfg.Consolidation.NeverRecalledArchiveDays, + StartupDelay: time.Duration(cfg.Consolidation.StartupDelaySec) * time.Second, } } diff --git a/internal/agent/consolidation/agent.go b/internal/agent/consolidation/agent.go index 3d8dab6d..1db1c6ba 100644 --- a/internal/agent/consolidation/agent.go +++ b/internal/agent/consolidation/agent.go @@ -60,6 +60,9 @@ type ConsolidationConfig struct { // Never-recalled watcher memory archival NeverRecalledArchiveDays int // archive non-MCP memories with 0 access after this many days (default 30, 0=disabled) + + // Startup delay + StartupDelay time.Duration // grace period before first cycle (default 30s) } // DefaultConfig returns sensible defaults for consolidation. @@ -204,8 +207,11 @@ func (ca *ConsolidationAgent) consolidationLoop() { ticker := time.NewTicker(ca.config.Interval) defer ticker.Stop() - // Run one cycle shortly after startup (30s grace period) - startupTimer := time.NewTimer(30 * time.Second) + startupDelay := ca.config.StartupDelay + if startupDelay <= 0 { + startupDelay = 30 * time.Second + } + startupTimer := time.NewTimer(startupDelay) defer startupTimer.Stop() runAndLog := func(trigger string) { diff --git a/internal/agent/orchestrator/orchestrator.go b/internal/agent/orchestrator/orchestrator.go index a84e5d8a..8d52e53b 100644 --- a/internal/agent/orchestrator/orchestrator.go +++ b/internal/agent/orchestrator/orchestrator.go @@ -17,12 +17,13 @@ import ( // OrchestratorConfig configures the autonomous orchestrator. type OrchestratorConfig struct { - AdaptiveIntervals bool - MaxDBSizeMB int - SelfTestInterval time.Duration - AutoRecovery bool - HealthReportPath string // e.g. "~/.mnemonic/health.json" - MonitorInterval time.Duration + AdaptiveIntervals bool + MaxDBSizeMB int + SelfTestInterval time.Duration + AutoRecovery bool + HealthReportPath string // e.g. "~/.mnemonic/health.json" + MonitorInterval time.Duration + HealthReportInterval time.Duration // how often to write health reports (default 5m) } // HealthReport is the machine-readable health status written periodically. @@ -337,7 +338,11 @@ func (o *Orchestrator) runSelfTest(ctx context.Context) { func (o *Orchestrator) healthReportLoop() { defer o.wg.Done() - ticker := time.NewTicker(5 * time.Minute) + reportInterval := o.config.HealthReportInterval + if reportInterval <= 0 { + reportInterval = 5 * time.Minute + } + ticker := time.NewTicker(reportInterval) defer ticker.Stop() // Write initial report diff --git a/internal/agent/reactor/registry.go b/internal/agent/reactor/registry.go index 45ffc156..801a8da2 100644 --- a/internal/agent/reactor/registry.go +++ b/internal/agent/reactor/registry.go @@ -18,9 +18,20 @@ type ChainDeps struct { DreamingTrigger chan<- struct{} IncrementAutonomous func() MaxDBSizeMB int + CooldownOverrides map[string]time.Duration // chain ID -> cooldown override Logger *slog.Logger } +// cooldown returns the override duration for a chain if set, otherwise the default. +func (d ChainDeps) cooldown(chainID string, defaultDuration time.Duration) time.Duration { + if d.CooldownOverrides != nil { + if override, ok := d.CooldownOverrides[chainID]; ok && override > 0 { + return override + } + } + return defaultDuration +} + // NewChainRegistry creates a registry with all built-in reactive chains. func NewChainRegistry(deps ChainDeps) []*Chain { log := deps.Logger @@ -40,7 +51,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain { }, &CooldownCondition{ ChainID: "meta_consolidation_on_dead_ratio", - Duration: 30 * time.Minute, + Duration: deps.cooldown("meta_consolidation_on_dead_ratio", 30*time.Minute), }, }, Actions: []Action{ @@ -56,7 +67,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain { Log: log, }, }, - Cooldown: 30 * time.Minute, + Cooldown: deps.cooldown("meta_consolidation_on_dead_ratio", 30*time.Minute), Priority: 10, Enabled: true, }) @@ -72,7 +83,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain { &DBSizeCondition{MaxSizeMB: deps.MaxDBSizeMB}, &CooldownCondition{ ChainID: "orch_consolidation_on_db_size", - Duration: 1 * time.Hour, + Duration: deps.cooldown("orch_consolidation_on_db_size", 1*time.Hour), }, }, Actions: []Action{ @@ -87,7 +98,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain { Increment: deps.IncrementAutonomous, }, }, - Cooldown: 1 * time.Hour, + Cooldown: deps.cooldown("orch_consolidation_on_db_size", 1*time.Hour), Priority: 5, Enabled: true, }) @@ -103,7 +114,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain { Conditions: []Condition{ &CooldownCondition{ ChainID: "consolidation_on_request", - Duration: 5 * time.Minute, + Duration: deps.cooldown("consolidation_on_request", 5*time.Minute), }, }, Actions: []Action{ @@ -113,7 +124,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain { Log: log, }, }, - Cooldown: 5 * time.Minute, + Cooldown: deps.cooldown("consolidation_on_request", 5*time.Minute), Priority: 100, Enabled: true, }) @@ -152,7 +163,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain { Conditions: []Condition{ &CooldownCondition{ ChainID: "meta_on_consolidation_completed", - Duration: 30 * time.Minute, + Duration: deps.cooldown("meta_on_consolidation_completed", 30*time.Minute), }, }, Actions: []Action{ @@ -162,7 +173,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain { Log: log, }, }, - Cooldown: 30 * time.Minute, + Cooldown: deps.cooldown("meta_on_consolidation_completed", 30*time.Minute), Priority: 40, Enabled: true, }) @@ -179,7 +190,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain { Conditions: []Condition{ &CooldownCondition{ ChainID: "dreaming_on_episode_closed", - Duration: 10 * time.Minute, + Duration: deps.cooldown("dreaming_on_episode_closed", 10*time.Minute), }, }, Actions: []Action{ @@ -189,7 +200,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain { Log: log, }, }, - Cooldown: 10 * time.Minute, + Cooldown: deps.cooldown("dreaming_on_episode_closed", 10*time.Minute), Priority: 30, Enabled: true, }) diff --git a/internal/config/config.go b/internal/config/config.go index 9b125e93..c5308df5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -28,6 +28,7 @@ type Config struct { Episoding EpisodingConfig `yaml:"episoding"` Abstraction AbstractionConfig `yaml:"abstraction"` Orchestrator OrchestratorConfig `yaml:"orchestrator"` + Reactor ReactorConfig `yaml:"reactor"` MCP MCPConfig `yaml:"mcp"` AgentSDK AgentSDKConfig `yaml:"agent_sdk"` Training TrainingConfig `yaml:"training"` @@ -236,6 +237,9 @@ type ConsolidationConfig struct { // Never-recalled watcher memory archival NeverRecalledArchiveDays int `yaml:"never_recalled_archive_days"` + + // Startup delay + StartupDelaySec int `yaml:"startup_delay_sec"` // seconds before first consolidation cycle (default: 30) } // RetrievalConfig holds retrieval settings. @@ -342,14 +346,21 @@ type AbstractionConfig struct { // OrchestratorConfig configures the autonomous orchestrator. type OrchestratorConfig struct { - Enabled bool `yaml:"enabled"` - AdaptiveIntervals bool `yaml:"adaptive_intervals"` - MaxDBSizeMB int `yaml:"max_db_size_mb"` - SelfTestIntervalRaw string `yaml:"self_test_interval"` - SelfTestInterval time.Duration `yaml:"-"` - AutoRecovery bool `yaml:"auto_recovery"` - MonitorIntervalRaw string `yaml:"monitor_interval"` - MonitorInterval time.Duration `yaml:"-"` + Enabled bool `yaml:"enabled"` + AdaptiveIntervals bool `yaml:"adaptive_intervals"` + MaxDBSizeMB int `yaml:"max_db_size_mb"` + SelfTestIntervalRaw string `yaml:"self_test_interval"` + SelfTestInterval time.Duration `yaml:"-"` + AutoRecovery bool `yaml:"auto_recovery"` + MonitorIntervalRaw string `yaml:"monitor_interval"` + MonitorInterval time.Duration `yaml:"-"` + HealthReportIntervalRaw string `yaml:"health_report_interval"` // how often to write health reports (default: "5m") + HealthReportInterval time.Duration `yaml:"-"` +} + +// ReactorConfig configures the event-driven reactor engine. +type ReactorConfig struct { + Cooldowns map[string]string `yaml:"cooldowns"` // chain ID -> duration string (e.g., "30m", "1h") } // MCPConfig holds MCP server settings. @@ -621,6 +632,7 @@ func Default() *Config { SelfSustainingMinEvidence: 10, SelfSustainingMinStrength: 0.9, SelfSustainingDecay: 0.9999, + StartupDelaySec: 30, }, Retrieval: RetrievalConfig{ MaxHops: 3, @@ -717,9 +729,12 @@ func Default() *Config { SelfTestIntervalRaw: "12h", SelfTestInterval: 12 * time.Hour, AutoRecovery: true, - MonitorIntervalRaw: "5m", - MonitorInterval: 5 * time.Minute, + MonitorIntervalRaw: "5m", + MonitorInterval: 5 * time.Minute, + HealthReportIntervalRaw: "5m", + HealthReportInterval: 5 * time.Minute, }, + Reactor: ReactorConfig{}, MCP: MCPConfig{ Enabled: true, }, @@ -817,6 +832,7 @@ func (c *Config) process(configDir string) error { {c.Abstraction.IntervalRaw, &c.Abstraction.Interval, "abstraction.interval"}, {c.Orchestrator.SelfTestIntervalRaw, &c.Orchestrator.SelfTestInterval, "orchestrator.self_test_interval"}, {c.Orchestrator.MonitorIntervalRaw, &c.Orchestrator.MonitorInterval, "orchestrator.monitor_interval"}, + {c.Orchestrator.HealthReportIntervalRaw, &c.Orchestrator.HealthReportInterval, "orchestrator.health_report_interval"}, {c.Metacognition.ReflectionLookbackRaw, &c.Metacognition.ReflectionLookback, "metacognition.reflection_lookback"}, {c.Metacognition.DeadMemoryWindowRaw, &c.Metacognition.DeadMemoryWindow, "metacognition.dead_memory_window"}, {c.Dreaming.DeadMemoryWindowRaw, &c.Dreaming.DeadMemoryWindow, "dreaming.dead_memory_window"},