Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions cmd/nightshift/commands/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/marcus/nightshift/internal/config"
"github.com/marcus/nightshift/internal/db"
"github.com/marcus/nightshift/internal/logging"
"github.com/marcus/nightshift/internal/observe"
"github.com/marcus/nightshift/internal/orchestrator"
"github.com/marcus/nightshift/internal/providers"
"github.com/marcus/nightshift/internal/reporting"
Expand Down Expand Up @@ -239,6 +240,7 @@ func runRun(cmd *cobra.Command, args []string) error {
log.Warn("--ignore-budget active, bypassing budget checks")
}

metrics := observe.New()
params := executeRunParams{
cfg: cfg,
budgetMgr: budgetMgr,
Expand All @@ -254,6 +256,7 @@ func runRun(cmd *cobra.Command, args []string) error {
yes: yes,
branch: branch,
agentTimeout: agentTimeout,
obs: metrics,
log: log,
}
if !dryRun {
Expand All @@ -278,6 +281,7 @@ type executeRunParams struct {
branch string
agentTimeout time.Duration
report *runReport
obs *observe.Collector
log *logging.Logger
}

Expand Down Expand Up @@ -690,6 +694,7 @@ func executeRun(ctx context.Context, p executeRunParams) error {
AgentTimeout: p.agentTimeout,
}),
orchestrator.WithLogger(logging.Component("orchestrator")),
orchestrator.WithObserver(p.obs),
}
if renderer != nil {
orchOpts = append(orchOpts, orchestrator.WithEventHandler(renderer.HandleEvent))
Expand Down Expand Up @@ -868,6 +873,21 @@ func executeRun(ctx context.Context, p executeRunParams) error {
"projects": len(p.projects),
})

// Record run-level gauges and flush metrics
p.obs.Gauge("run_duration_ms", float64(duration.Milliseconds()))
p.obs.Gauge("projects_processed", float64(len(p.projects)))
p.obs.Gauge("tasks_run", float64(tasksRun))
p.obs.Gauge("tasks_completed", float64(tasksCompleted))
p.obs.Gauge("tasks_failed", float64(tasksFailed))
// Compute budget_used_percent from first available provider
for _, pp := range plan.projects {
if pp.provider != nil {
p.obs.Gauge("budget_used_percent", pp.provider.allowance.UsedPercent)
break
}
}
p.obs.Flush(p.log)

if p.report != nil {
p.report.finalize(p.cfg, p.log)
}
Expand Down
139 changes: 139 additions & 0 deletions internal/observe/observe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Package observe provides lightweight, thread-safe metrics collection
// for nightshift orchestration runs. Counters, gauges, and duration
// histograms are accumulated in memory and flushed as a single structured
// JSON log entry via zerolog at the end of each run.
package observe

import (
"sync"
"time"

"github.com/marcus/nightshift/internal/logging"
)

// Collector accumulates metrics during a run and flushes them as a
// single structured log entry. All methods are safe for concurrent use.
// A nil *Collector is valid and all operations are no-ops, so callers
// can skip nil checks.
type Collector struct {
mu sync.Mutex
counters map[string]int64
gauges map[string]float64
durations map[string][]time.Duration
}

// New returns a new Collector ready for use.
func New() *Collector {
return &Collector{
counters: make(map[string]int64),
gauges: make(map[string]float64),
durations: make(map[string][]time.Duration),
}
}

// Counter increments a named counter by delta.
func (c *Collector) Counter(name string, delta int64) {
if c == nil {
return
}
c.mu.Lock()
c.counters[name] += delta
c.mu.Unlock()
}

// Gauge sets a named gauge to the given value.
func (c *Collector) Gauge(name string, value float64) {
if c == nil {
return
}
c.mu.Lock()
c.gauges[name] = value
c.mu.Unlock()
}

// Duration records a duration sample for the named histogram.
func (c *Collector) Duration(name string, d time.Duration) {
if c == nil {
return
}
c.mu.Lock()
c.durations[name] = append(c.durations[name], d)
c.mu.Unlock()
}

// durationStats summarises a slice of durations.
type durationStats struct {
Count int `json:"count"`
MinMs float64 `json:"min_ms"`
MaxMs float64 `json:"max_ms"`
AvgMs float64 `json:"avg_ms"`
SumMs float64 `json:"sum_ms"`
}

func computeStats(ds []time.Duration) durationStats {
if len(ds) == 0 {
return durationStats{}
}
min := ds[0]
max := ds[0]
var sum time.Duration
for _, d := range ds {
sum += d
if d < min {
min = d
}
if d > max {
max = d
}
}
return durationStats{
Count: len(ds),
MinMs: float64(min.Milliseconds()),
MaxMs: float64(max.Milliseconds()),
AvgMs: float64(sum.Milliseconds()) / float64(len(ds)),
SumMs: float64(sum.Milliseconds()),
}
}

// Snapshot returns a copy of all collected metrics as a plain map.
func (c *Collector) Snapshot() map[string]any {
if c == nil {
return nil
}
c.mu.Lock()
defer c.mu.Unlock()

out := make(map[string]any, len(c.counters)+len(c.gauges)+len(c.durations))
for k, v := range c.counters {
out[k] = v
}
for k, v := range c.gauges {
out[k] = v
}
for k, ds := range c.durations {
out[k] = computeStats(ds)
}
return out
}

// Flush emits all collected metrics as a single structured zerolog info
// entry and resets the collector. Subsequent calls will emit empty data
// until new metrics are recorded.
func (c *Collector) Flush(logger *logging.Logger) {
if c == nil || logger == nil {
return
}
snap := c.Snapshot()
if len(snap) == 0 {
return
}

logger.InfoCtx("run metrics", snap)

// Reset
c.mu.Lock()
c.counters = make(map[string]int64)
c.gauges = make(map[string]float64)
c.durations = make(map[string][]time.Duration)
c.mu.Unlock()
}
139 changes: 139 additions & 0 deletions internal/observe/observe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package observe

import (
"sync"
"testing"
"time"
)

func TestCounter(t *testing.T) {
c := New()
c.Counter("task_total", 1)
c.Counter("task_total", 1)
c.Counter("task_failed", 1)

snap := c.Snapshot()
if snap["task_total"] != int64(2) {
t.Errorf("task_total = %v, want 2", snap["task_total"])
}
if snap["task_failed"] != int64(1) {
t.Errorf("task_failed = %v, want 1", snap["task_failed"])
}
}

func TestGauge(t *testing.T) {
c := New()
c.Gauge("budget_used_percent", 42.5)
c.Gauge("budget_used_percent", 55.0) // overwrite

snap := c.Snapshot()
if snap["budget_used_percent"] != 55.0 {
t.Errorf("budget_used_percent = %v, want 55.0", snap["budget_used_percent"])
}
}

func TestDuration(t *testing.T) {
c := New()
c.Duration("plan_duration_ms", 100*time.Millisecond)
c.Duration("plan_duration_ms", 200*time.Millisecond)
c.Duration("plan_duration_ms", 300*time.Millisecond)

snap := c.Snapshot()
stats, ok := snap["plan_duration_ms"].(durationStats)
if !ok {
t.Fatalf("plan_duration_ms type = %T, want durationStats", snap["plan_duration_ms"])
}
if stats.Count != 3 {
t.Errorf("count = %d, want 3", stats.Count)
}
if stats.MinMs != 100 {
t.Errorf("min_ms = %v, want 100", stats.MinMs)
}
if stats.MaxMs != 300 {
t.Errorf("max_ms = %v, want 300", stats.MaxMs)
}
if stats.AvgMs != 200 {
t.Errorf("avg_ms = %v, want 200", stats.AvgMs)
}
if stats.SumMs != 600 {
t.Errorf("sum_ms = %v, want 600", stats.SumMs)
}
}

func TestNilCollector(t *testing.T) {
var c *Collector
// All operations should be no-ops on nil, no panic.
c.Counter("x", 1)
c.Gauge("x", 1.0)
c.Duration("x", time.Second)
c.Flush(nil)
if snap := c.Snapshot(); snap != nil {
t.Errorf("Snapshot on nil = %v, want nil", snap)
}
}

func TestFlushResetsState(t *testing.T) {
c := New()
c.Counter("a", 5)
c.Gauge("b", 1.0)
c.Duration("c", time.Millisecond)

// Flush without a logger (nil logger is safe).
c.Flush(nil)

// After Flush with nil logger, data should still be present since
// Flush returns early when logger is nil. Verify by calling Snapshot.
snap := c.Snapshot()
if len(snap) == 0 {

Check failure on line 87 in internal/observe/observe_test.go

View workflow job for this annotation

GitHub Actions / Lint

SA9003: empty branch (staticcheck)
// This would mean Flush cleared data even without a logger,
// which is fine behavior-wise but let's test the real path.
}

// Use a real scenario: reset manually via Snapshot check after non-nil flush.
c2 := New()
c2.Counter("x", 1)
snap2 := c2.Snapshot()
if snap2["x"] != int64(1) {
t.Errorf("pre-reset x = %v, want 1", snap2["x"])
}
}

func TestConcurrency(t *testing.T) {
c := New()
var wg sync.WaitGroup

for range 100 {
wg.Add(1)
go func() {
defer wg.Done()
c.Counter("concurrent", 1)
c.Gauge("g", 1.0)
c.Duration("d", time.Millisecond)
}()
}
wg.Wait()

snap := c.Snapshot()
if snap["concurrent"] != int64(100) {
t.Errorf("concurrent = %v, want 100", snap["concurrent"])
}
}

func TestSnapshotIsACopy(t *testing.T) {
c := New()
c.Counter("x", 1)
snap := c.Snapshot()
snap["x"] = int64(999) // mutate snapshot

snap2 := c.Snapshot()
if snap2["x"] != int64(1) {
t.Errorf("mutation leaked: x = %v, want 1", snap2["x"])
}
}

func TestComputeStatsEmpty(t *testing.T) {
stats := computeStats(nil)
if stats.Count != 0 {
t.Errorf("empty count = %d, want 0", stats.Count)
}
}
Loading
Loading