Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions cmd/mnemonic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1621,12 +1621,13 @@ func serveCommand(configPath string) {
var orch *orchestrator.Orchestrator
if cfg.Orchestrator.Enabled {
orch = orchestrator.NewOrchestrator(memStore, wrap("orchestrator"), orchestrator.OrchestratorConfig{
AdaptiveIntervals: cfg.Orchestrator.AdaptiveIntervals,
MaxDBSizeMB: cfg.Orchestrator.MaxDBSizeMB,
SelfTestInterval: cfg.Orchestrator.SelfTestInterval,
AutoRecovery: cfg.Orchestrator.AutoRecovery,
HealthReportPath: filepath.Join(filepath.Dir(cfg.Store.DBPath), "health.json"),
MonitorInterval: cfg.Orchestrator.MonitorInterval,
AdaptiveIntervals: cfg.Orchestrator.AdaptiveIntervals,
MaxDBSizeMB: cfg.Orchestrator.MaxDBSizeMB,
SelfTestInterval: cfg.Orchestrator.SelfTestInterval,
AutoRecovery: cfg.Orchestrator.AutoRecovery,
HealthReportPath: filepath.Join(filepath.Dir(cfg.Store.DBPath), "health.json"),
MonitorInterval: cfg.Orchestrator.MonitorInterval,
HealthReportInterval: cfg.Orchestrator.HealthReportInterval,
}, log)

if err := orch.Start(rootCtx, bus); err != nil {
Expand All @@ -1643,9 +1644,24 @@ func serveCommand(configPath string) {
reactorLog := log.With("component", "reactor")
reactorEngine := reactor.NewEngine(memStore, bus, reactorLog)

// Parse reactor cooldown overrides from config
var cooldownOverrides map[string]time.Duration
if len(cfg.Reactor.Cooldowns) > 0 {
cooldownOverrides = make(map[string]time.Duration, len(cfg.Reactor.Cooldowns))
for chainID, durStr := range cfg.Reactor.Cooldowns {
d, err := time.ParseDuration(durStr)
if err != nil {
log.Warn("invalid reactor cooldown duration, ignoring", "chain_id", chainID, "value", durStr, "error", err)
continue
}
cooldownOverrides[chainID] = d
}
}

deps := reactor.ChainDeps{
MaxDBSizeMB: cfg.Orchestrator.MaxDBSizeMB,
Logger: reactorLog,
MaxDBSizeMB: cfg.Orchestrator.MaxDBSizeMB,
CooldownOverrides: cooldownOverrides,
Logger: reactorLog,
}
if consolidator != nil {
deps.ConsolidationTrigger = consolidator.GetTriggerChannel()
Expand Down Expand Up @@ -1913,6 +1929,7 @@ func toConsolidationConfig(cfg *config.Config) consolidation.ConsolidationConfig
SelfSustainingMinStrength: float32(cfg.Consolidation.SelfSustainingMinStrength),
SelfSustainingDecay: float32(cfg.Consolidation.SelfSustainingDecay),
NeverRecalledArchiveDays: cfg.Consolidation.NeverRecalledArchiveDays,
StartupDelay: time.Duration(cfg.Consolidation.StartupDelaySec) * time.Second,
}
}

Expand Down
10 changes: 8 additions & 2 deletions internal/agent/consolidation/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type ConsolidationConfig struct {

// Never-recalled watcher memory archival
NeverRecalledArchiveDays int // archive non-MCP memories with 0 access after this many days (default 30, 0=disabled)

// Startup delay
StartupDelay time.Duration // grace period before first cycle (default 30s)
}

// DefaultConfig returns sensible defaults for consolidation.
Expand Down Expand Up @@ -204,8 +207,11 @@ func (ca *ConsolidationAgent) consolidationLoop() {
ticker := time.NewTicker(ca.config.Interval)
defer ticker.Stop()

// Run one cycle shortly after startup (30s grace period)
startupTimer := time.NewTimer(30 * time.Second)
startupDelay := ca.config.StartupDelay
if startupDelay <= 0 {
startupDelay = 30 * time.Second
}
startupTimer := time.NewTimer(startupDelay)
defer startupTimer.Stop()

runAndLog := func(trigger string) {
Expand Down
19 changes: 12 additions & 7 deletions internal/agent/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ import (

// OrchestratorConfig configures the autonomous orchestrator.
type OrchestratorConfig struct {
AdaptiveIntervals bool
MaxDBSizeMB int
SelfTestInterval time.Duration
AutoRecovery bool
HealthReportPath string // e.g. "~/.mnemonic/health.json"
MonitorInterval time.Duration
AdaptiveIntervals bool
MaxDBSizeMB int
SelfTestInterval time.Duration
AutoRecovery bool
HealthReportPath string // e.g. "~/.mnemonic/health.json"
MonitorInterval time.Duration
HealthReportInterval time.Duration // how often to write health reports (default 5m)
}

// HealthReport is the machine-readable health status written periodically.
Expand Down Expand Up @@ -337,7 +338,11 @@ func (o *Orchestrator) runSelfTest(ctx context.Context) {
func (o *Orchestrator) healthReportLoop() {
defer o.wg.Done()

ticker := time.NewTicker(5 * time.Minute)
reportInterval := o.config.HealthReportInterval
if reportInterval <= 0 {
reportInterval = 5 * time.Minute
}
ticker := time.NewTicker(reportInterval)
defer ticker.Stop()

// Write initial report
Expand Down
31 changes: 21 additions & 10 deletions internal/agent/reactor/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,20 @@ type ChainDeps struct {
DreamingTrigger chan<- struct{}
IncrementAutonomous func()
MaxDBSizeMB int
CooldownOverrides map[string]time.Duration // chain ID -> cooldown override
Logger *slog.Logger
}

// cooldown returns the override duration for a chain if set, otherwise the default.
func (d ChainDeps) cooldown(chainID string, defaultDuration time.Duration) time.Duration {
if d.CooldownOverrides != nil {
if override, ok := d.CooldownOverrides[chainID]; ok && override > 0 {
return override
}
}
return defaultDuration
}

// NewChainRegistry creates a registry with all built-in reactive chains.
func NewChainRegistry(deps ChainDeps) []*Chain {
log := deps.Logger
Expand All @@ -40,7 +51,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain {
},
&CooldownCondition{
ChainID: "meta_consolidation_on_dead_ratio",
Duration: 30 * time.Minute,
Duration: deps.cooldown("meta_consolidation_on_dead_ratio", 30*time.Minute),
},
},
Actions: []Action{
Expand All @@ -56,7 +67,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain {
Log: log,
},
},
Cooldown: 30 * time.Minute,
Cooldown: deps.cooldown("meta_consolidation_on_dead_ratio", 30*time.Minute),
Priority: 10,
Enabled: true,
})
Expand All @@ -72,7 +83,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain {
&DBSizeCondition{MaxSizeMB: deps.MaxDBSizeMB},
&CooldownCondition{
ChainID: "orch_consolidation_on_db_size",
Duration: 1 * time.Hour,
Duration: deps.cooldown("orch_consolidation_on_db_size", 1*time.Hour),
},
},
Actions: []Action{
Expand All @@ -87,7 +98,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain {
Increment: deps.IncrementAutonomous,
},
},
Cooldown: 1 * time.Hour,
Cooldown: deps.cooldown("orch_consolidation_on_db_size", 1*time.Hour),
Priority: 5,
Enabled: true,
})
Expand All @@ -103,7 +114,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain {
Conditions: []Condition{
&CooldownCondition{
ChainID: "consolidation_on_request",
Duration: 5 * time.Minute,
Duration: deps.cooldown("consolidation_on_request", 5*time.Minute),
},
},
Actions: []Action{
Expand All @@ -113,7 +124,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain {
Log: log,
},
},
Cooldown: 5 * time.Minute,
Cooldown: deps.cooldown("consolidation_on_request", 5*time.Minute),
Priority: 100,
Enabled: true,
})
Expand Down Expand Up @@ -152,7 +163,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain {
Conditions: []Condition{
&CooldownCondition{
ChainID: "meta_on_consolidation_completed",
Duration: 30 * time.Minute,
Duration: deps.cooldown("meta_on_consolidation_completed", 30*time.Minute),
},
},
Actions: []Action{
Expand All @@ -162,7 +173,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain {
Log: log,
},
},
Cooldown: 30 * time.Minute,
Cooldown: deps.cooldown("meta_on_consolidation_completed", 30*time.Minute),
Priority: 40,
Enabled: true,
})
Expand All @@ -179,7 +190,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain {
Conditions: []Condition{
&CooldownCondition{
ChainID: "dreaming_on_episode_closed",
Duration: 10 * time.Minute,
Duration: deps.cooldown("dreaming_on_episode_closed", 10*time.Minute),
},
},
Actions: []Action{
Expand All @@ -189,7 +200,7 @@ func NewChainRegistry(deps ChainDeps) []*Chain {
Log: log,
},
},
Cooldown: 10 * time.Minute,
Cooldown: deps.cooldown("dreaming_on_episode_closed", 10*time.Minute),
Priority: 30,
Enabled: true,
})
Expand Down
36 changes: 26 additions & 10 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
Episoding EpisodingConfig `yaml:"episoding"`
Abstraction AbstractionConfig `yaml:"abstraction"`
Orchestrator OrchestratorConfig `yaml:"orchestrator"`
Reactor ReactorConfig `yaml:"reactor"`
MCP MCPConfig `yaml:"mcp"`
AgentSDK AgentSDKConfig `yaml:"agent_sdk"`
Training TrainingConfig `yaml:"training"`
Expand Down Expand Up @@ -236,6 +237,9 @@ type ConsolidationConfig struct {

// Never-recalled watcher memory archival
NeverRecalledArchiveDays int `yaml:"never_recalled_archive_days"`

// Startup delay
StartupDelaySec int `yaml:"startup_delay_sec"` // seconds before first consolidation cycle (default: 30)
}

// RetrievalConfig holds retrieval settings.
Expand Down Expand Up @@ -342,14 +346,21 @@ type AbstractionConfig struct {

// OrchestratorConfig configures the autonomous orchestrator.
type OrchestratorConfig struct {
Enabled bool `yaml:"enabled"`
AdaptiveIntervals bool `yaml:"adaptive_intervals"`
MaxDBSizeMB int `yaml:"max_db_size_mb"`
SelfTestIntervalRaw string `yaml:"self_test_interval"`
SelfTestInterval time.Duration `yaml:"-"`
AutoRecovery bool `yaml:"auto_recovery"`
MonitorIntervalRaw string `yaml:"monitor_interval"`
MonitorInterval time.Duration `yaml:"-"`
Enabled bool `yaml:"enabled"`
AdaptiveIntervals bool `yaml:"adaptive_intervals"`
MaxDBSizeMB int `yaml:"max_db_size_mb"`
SelfTestIntervalRaw string `yaml:"self_test_interval"`
SelfTestInterval time.Duration `yaml:"-"`
AutoRecovery bool `yaml:"auto_recovery"`
MonitorIntervalRaw string `yaml:"monitor_interval"`
MonitorInterval time.Duration `yaml:"-"`
HealthReportIntervalRaw string `yaml:"health_report_interval"` // how often to write health reports (default: "5m")
HealthReportInterval time.Duration `yaml:"-"`
}

// ReactorConfig configures the event-driven reactor engine.
type ReactorConfig struct {
Cooldowns map[string]string `yaml:"cooldowns"` // chain ID -> duration string (e.g., "30m", "1h")
}

// MCPConfig holds MCP server settings.
Expand Down Expand Up @@ -621,6 +632,7 @@ func Default() *Config {
SelfSustainingMinEvidence: 10,
SelfSustainingMinStrength: 0.9,
SelfSustainingDecay: 0.9999,
StartupDelaySec: 30,
},
Retrieval: RetrievalConfig{
MaxHops: 3,
Expand Down Expand Up @@ -717,9 +729,12 @@ func Default() *Config {
SelfTestIntervalRaw: "12h",
SelfTestInterval: 12 * time.Hour,
AutoRecovery: true,
MonitorIntervalRaw: "5m",
MonitorInterval: 5 * time.Minute,
MonitorIntervalRaw: "5m",
MonitorInterval: 5 * time.Minute,
HealthReportIntervalRaw: "5m",
HealthReportInterval: 5 * time.Minute,
},
Reactor: ReactorConfig{},
MCP: MCPConfig{
Enabled: true,
},
Expand Down Expand Up @@ -817,6 +832,7 @@ func (c *Config) process(configDir string) error {
{c.Abstraction.IntervalRaw, &c.Abstraction.Interval, "abstraction.interval"},
{c.Orchestrator.SelfTestIntervalRaw, &c.Orchestrator.SelfTestInterval, "orchestrator.self_test_interval"},
{c.Orchestrator.MonitorIntervalRaw, &c.Orchestrator.MonitorInterval, "orchestrator.monitor_interval"},
{c.Orchestrator.HealthReportIntervalRaw, &c.Orchestrator.HealthReportInterval, "orchestrator.health_report_interval"},
{c.Metacognition.ReflectionLookbackRaw, &c.Metacognition.ReflectionLookback, "metacognition.reflection_lookback"},
{c.Metacognition.DeadMemoryWindowRaw, &c.Metacognition.DeadMemoryWindow, "metacognition.dead_memory_window"},
{c.Dreaming.DeadMemoryWindowRaw, &c.Dreaming.DeadMemoryWindow, "dreaming.dead_memory_window"},
Expand Down