Skip to content

Commit 4a16fd9

Browse files
committed
feat(agent): migrate metrics to OpenTelemetry SDK
1 parent b3ad334 commit 4a16fd9

File tree

4 files changed

+212
-63
lines changed

4 files changed

+212
-63
lines changed

cmd/agent/main.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
package main
44

55
import (
6+
"context"
67
"flag"
78
"os"
89

910
_ "k8s.io/client-go/plugin/pkg/client/auth"
1011

12+
"github.com/prometheus/client_golang/prometheus"
1113
"k8s.io/apimachinery/pkg/runtime"
1214
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
1315
ctrl "sigs.k8s.io/controller-runtime"
@@ -17,6 +19,7 @@ import (
1719
impdevv1alpha1 "github.com/syscode-labs/imp/api/v1alpha1"
1820
"github.com/syscode-labs/imp/internal/agent"
1921
"github.com/syscode-labs/imp/internal/agent/network"
22+
"github.com/syscode-labs/imp/internal/telemetry"
2023
)
2124

2225
func main() {
@@ -57,7 +60,18 @@ func main() {
5760
os.Exit(1)
5861
}
5962

60-
mc := agent.NewVMMetricsCollector()
63+
agentReg := prometheus.NewRegistry()
64+
agentReg.MustRegister(
65+
prometheus.NewGoCollector(),
66+
prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}),
67+
)
68+
mp, shutdownTelemetry, err := telemetry.SetupMeterProvider(context.Background(), agentReg)
69+
if err != nil {
70+
log.Error(err, "unable to set up telemetry")
71+
os.Exit(1)
72+
}
73+
defer func() { _ = shutdownTelemetry(context.Background()) }()
74+
mc := agent.NewVMMetricsCollector(mp.Meter("imp.agent"), agentReg)
6175

6276
// IMP_STUB_DRIVER=true: StubDriver (CI, test clusters, no KVM needed).
6377
// Otherwise: FirecrackerDriver (reads FC_BIN, FC_SOCK_DIR, FC_KERNEL env vars).

internal/agent/firecracker_driver_test.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ import (
1111
"time"
1212

1313
firecracker "github.com/firecracker-microvm/firecracker-go-sdk"
14+
"github.com/prometheus/client_golang/prometheus"
15+
otelprometheus "go.opentelemetry.io/otel/exporters/prometheus"
16+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
1417
"google.golang.org/grpc"
1518
"k8s.io/apimachinery/pkg/runtime"
1619
"sigs.k8s.io/controller-runtime/pkg/client/fake"
@@ -21,6 +24,20 @@ import (
2124
pb "github.com/syscode-labs/imp/internal/proto/guest"
2225
)
2326

27+
// newTestMetricsCollector creates a VMMetricsCollector backed by a throw-away OTel meter
28+
// and Prometheus registry, suitable for use in unit tests.
29+
func newTestMetricsCollector(t *testing.T) *VMMetricsCollector {
30+
t.Helper()
31+
reg := prometheus.NewRegistry()
32+
exp, err := otelprometheus.New(otelprometheus.WithRegisterer(reg))
33+
if err != nil {
34+
t.Fatal(err)
35+
}
36+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(exp))
37+
t.Cleanup(func() { _ = mp.Shutdown(context.Background()) })
38+
return NewVMMetricsCollector(mp.Meter("test"), reg)
39+
}
40+
2441
type fakeRootfsBuilder struct {
2542
buildCalled bool
2643
buildCompositeCalled bool
@@ -635,7 +652,7 @@ func TestFirecrackerDriver_Snapshot_noVM_returnsError(t *testing.T) {
635652

636653
func TestFirecrackerDriver_HasMetricsAndNodeNameFields(t *testing.T) {
637654
d := &FirecrackerDriver{
638-
Metrics: NewVMMetricsCollector(),
655+
Metrics: newTestMetricsCollector(t),
639656
NodeName: "node-1",
640657
procs: make(map[string]*fcProc),
641658
}
@@ -692,7 +709,7 @@ func TestFirecrackerDriver_PollMetrics(t *testing.T) {
692709
metricsInterval = 1 * time.Millisecond
693710
defer func() { metricsInterval = old }()
694711

695-
mc := NewVMMetricsCollector()
712+
mc := newTestMetricsCollector(t)
696713
d := &FirecrackerDriver{
697714
Metrics: mc,
698715
NodeName: "node-1",

internal/agent/metrics.go

Lines changed: 125 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -3,85 +3,160 @@
33
package agent
44

55
import (
6+
"context"
67
"net/http"
78
"strings"
9+
"sync"
810

911
"github.com/prometheus/client_golang/prometheus"
1012
"github.com/prometheus/client_golang/prometheus/promhttp"
13+
"go.opentelemetry.io/otel/attribute"
14+
"go.opentelemetry.io/otel/metric"
1115
)
1216

1317
const metricsPort = ":9090"
1418

19+
type vmStateEntry struct {
20+
state string
21+
node string
22+
}
23+
24+
type guestData struct {
25+
cpu float64
26+
mem float64
27+
disk float64
28+
node string
29+
class string
30+
}
31+
1532
// VMMetricsCollector holds per-VM metric state for the node agent.
1633
type VMMetricsCollector struct {
17-
vmState *prometheus.GaugeVec
18-
guestCPU *prometheus.GaugeVec
19-
guestMem *prometheus.GaugeVec
20-
guestDisk *prometheus.GaugeVec
21-
reg *prometheus.Registry
34+
mu sync.RWMutex
35+
vmStates map[string]vmStateEntry // "ns/name" → {state, node}
36+
guestMetrics map[string]*guestData // "ns/name" → data
37+
gatherer prometheus.Gatherer
2238
}
2339

24-
// NewVMMetricsCollector creates a new collector with its own registry.
25-
func NewVMMetricsCollector() *VMMetricsCollector {
26-
reg := prometheus.NewRegistry()
40+
// NewVMMetricsCollector creates a new collector using the provided OTel meter.
41+
// gatherer is the Prometheus registry used by the OTel Prometheus exporter;
42+
// it is used to serve the /metrics HTTP handler.
43+
func NewVMMetricsCollector(meter metric.Meter, gatherer prometheus.Gatherer) *VMMetricsCollector {
2744
c := &VMMetricsCollector{
28-
vmState: prometheus.NewGaugeVec(prometheus.GaugeOpts{
29-
Name: "imp_vm_state",
30-
Help: "Current VM state (1 = active state).",
31-
}, []string{"impvm", "namespace", "node", "state"}),
32-
guestCPU: prometheus.NewGaugeVec(prometheus.GaugeOpts{
33-
Name: "imp_vm_guest_cpu_usage_ratio",
34-
Help: "Guest VM CPU usage ratio (0.0–1.0).",
35-
}, []string{"impvm", "namespace", "node", "impvmclass"}),
36-
guestMem: prometheus.NewGaugeVec(prometheus.GaugeOpts{
37-
Name: "imp_vm_guest_memory_used_bytes",
38-
Help: "Guest VM memory used bytes.",
39-
}, []string{"impvm", "namespace", "node", "impvmclass"}),
40-
guestDisk: prometheus.NewGaugeVec(prometheus.GaugeOpts{
41-
Name: "imp_vm_guest_disk_used_bytes",
42-
Help: "Guest VM root disk used bytes.",
43-
}, []string{"impvm", "namespace", "node", "impvmclass"}),
44-
reg: reg,
45+
vmStates: make(map[string]vmStateEntry),
46+
guestMetrics: make(map[string]*guestData),
47+
gatherer: gatherer,
4548
}
46-
reg.MustRegister(c.vmState, c.guestCPU, c.guestMem, c.guestDisk)
47-
reg.MustRegister(prometheus.NewGoCollector(), prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
49+
50+
_, _ = meter.Float64ObservableGauge(
51+
"imp_vm_state",
52+
metric.WithDescription("Current VM state (1 = active state)."),
53+
metric.WithFloat64Callback(func(_ context.Context, o metric.Float64Observer) error {
54+
c.mu.RLock()
55+
defer c.mu.RUnlock()
56+
for key, entry := range c.vmStates {
57+
ns, name := splitKey(key)
58+
o.Observe(1, metric.WithAttributes(
59+
attribute.String("impvm", name),
60+
attribute.String("namespace", ns),
61+
attribute.String("node", entry.node),
62+
attribute.String("state", entry.state),
63+
))
64+
}
65+
return nil
66+
}),
67+
)
68+
69+
_, _ = meter.Float64ObservableGauge(
70+
"imp_vm_guest_cpu_usage_ratio",
71+
metric.WithDescription("Guest VM CPU usage ratio (0.0–1.0)."),
72+
metric.WithFloat64Callback(func(_ context.Context, o metric.Float64Observer) error {
73+
c.mu.RLock()
74+
defer c.mu.RUnlock()
75+
for key, d := range c.guestMetrics {
76+
ns, name := splitKey(key)
77+
o.Observe(d.cpu, metric.WithAttributes(
78+
attribute.String("impvm", name),
79+
attribute.String("namespace", ns),
80+
attribute.String("node", d.node),
81+
attribute.String("impvmclass", d.class),
82+
))
83+
}
84+
return nil
85+
}),
86+
)
87+
88+
_, _ = meter.Float64ObservableGauge(
89+
"imp_vm_guest_memory_used_bytes",
90+
metric.WithDescription("Guest VM memory used bytes."),
91+
metric.WithFloat64Callback(func(_ context.Context, o metric.Float64Observer) error {
92+
c.mu.RLock()
93+
defer c.mu.RUnlock()
94+
for key, d := range c.guestMetrics {
95+
ns, name := splitKey(key)
96+
o.Observe(d.mem, metric.WithAttributes(
97+
attribute.String("impvm", name),
98+
attribute.String("namespace", ns),
99+
attribute.String("node", d.node),
100+
attribute.String("impvmclass", d.class),
101+
))
102+
}
103+
return nil
104+
}),
105+
)
106+
107+
_, _ = meter.Float64ObservableGauge(
108+
"imp_vm_guest_disk_used_bytes",
109+
metric.WithDescription("Guest VM root disk used bytes."),
110+
metric.WithFloat64Callback(func(_ context.Context, o metric.Float64Observer) error {
111+
c.mu.RLock()
112+
defer c.mu.RUnlock()
113+
for key, d := range c.guestMetrics {
114+
ns, name := splitKey(key)
115+
o.Observe(d.disk, metric.WithAttributes(
116+
attribute.String("impvm", name),
117+
attribute.String("namespace", ns),
118+
attribute.String("node", d.node),
119+
attribute.String("impvmclass", d.class),
120+
))
121+
}
122+
return nil
123+
}),
124+
)
125+
48126
return c
49127
}
50128

51-
// SetVMState sets the imp_vm_state gauge for a VM. key = "namespace/name".
52-
// Clears any previous state series for this VM so only one state is active at a time.
129+
// SetVMState sets the current state for a VM. Only one state is active per VM at a time.
53130
func (c *VMMetricsCollector) SetVMState(key, state, node string) {
54-
ns, name := splitKey(key)
55-
// Remove stale state series before setting the new one to avoid double-counting.
56-
c.vmState.DeletePartialMatch(prometheus.Labels{"impvm": name, "namespace": ns, "node": node})
57-
c.vmState.WithLabelValues(name, ns, node, state).Set(1)
131+
c.mu.Lock()
132+
defer c.mu.Unlock()
133+
c.vmStates[key] = vmStateEntry{state: state, node: node}
58134
}
59135

60136
// SetGuestMetrics updates guest agent metrics for a VM.
61137
func (c *VMMetricsCollector) SetGuestMetrics(key, node, impvmclass string, cpu float64, mem, disk int64) {
62-
ns, name := splitKey(key)
63-
c.guestCPU.WithLabelValues(name, ns, node, impvmclass).Set(cpu)
64-
c.guestMem.WithLabelValues(name, ns, node, impvmclass).Set(float64(mem))
65-
c.guestDisk.WithLabelValues(name, ns, node, impvmclass).Set(float64(disk))
138+
c.mu.Lock()
139+
defer c.mu.Unlock()
140+
c.guestMetrics[key] = &guestData{
141+
cpu: cpu,
142+
mem: float64(mem),
143+
disk: float64(disk),
144+
node: node,
145+
class: impvmclass,
146+
}
66147
}
67148

68-
// ClearVM removes all metric series for a VM when it's deleted.
149+
// ClearVM removes all metric state for a VM when it is deleted.
69150
func (c *VMMetricsCollector) ClearVM(key string) {
70-
ns, name := splitKey(key)
71-
c.vmState.DeletePartialMatch(prometheus.Labels{"impvm": name, "namespace": ns})
72-
c.guestCPU.DeletePartialMatch(prometheus.Labels{"impvm": name, "namespace": ns})
73-
c.guestMem.DeletePartialMatch(prometheus.Labels{"impvm": name, "namespace": ns})
74-
c.guestDisk.DeletePartialMatch(prometheus.Labels{"impvm": name, "namespace": ns})
75-
}
76-
77-
// NewMetricsHandler returns an HTTP handler for the default Prometheus registry.
78-
func NewMetricsHandler() http.Handler {
79-
return promhttp.Handler()
151+
c.mu.Lock()
152+
defer c.mu.Unlock()
153+
delete(c.vmStates, key)
154+
delete(c.guestMetrics, key)
80155
}
81156

82-
// NewMetricsHandlerWithCollector returns an HTTP handler for the given collector's registry.
157+
// NewMetricsHandlerWithCollector returns an HTTP handler for the collector's Prometheus registry.
83158
func NewMetricsHandlerWithCollector(c *VMMetricsCollector) http.Handler {
84-
return promhttp.HandlerFor(c.reg, promhttp.HandlerOpts{})
159+
return promhttp.HandlerFor(c.gatherer, promhttp.HandlerOpts{})
85160
}
86161

87162
func splitKey(key string) (ns, name string) {

internal/agent/metrics_test.go

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,80 @@
33
package agent_test
44

55
import (
6+
"context"
67
"net/http"
78
"net/http/httptest"
89
"strings"
910
"testing"
1011

12+
"github.com/prometheus/client_golang/prometheus"
13+
otelprometheus "go.opentelemetry.io/otel/exporters/prometheus"
14+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
15+
1116
"github.com/syscode-labs/imp/internal/agent"
1217
)
1318

14-
func TestMetricsHandler_serves(t *testing.T) {
15-
h := agent.NewMetricsHandler()
19+
func newTestCollector(t *testing.T) (*agent.VMMetricsCollector, http.Handler) {
20+
t.Helper()
21+
reg := prometheus.NewRegistry()
22+
exporter, err := otelprometheus.New(otelprometheus.WithRegisterer(reg))
23+
if err != nil {
24+
t.Fatal(err)
25+
}
26+
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(exporter))
27+
t.Cleanup(func() { _ = mp.Shutdown(context.Background()) })
28+
29+
mc := agent.NewVMMetricsCollector(mp.Meter("test"), reg)
30+
h := agent.NewMetricsHandlerWithCollector(mc)
31+
return mc, h
32+
}
33+
34+
func TestVMMetrics_vmStateAppears(t *testing.T) {
35+
mc, h := newTestCollector(t)
36+
mc.SetVMState("default/test-vm", "Running", "test-node")
37+
1638
req := httptest.NewRequest(http.MethodGet, "/metrics", nil)
1739
w := httptest.NewRecorder()
1840
h.ServeHTTP(w, req)
41+
1942
if w.Code != http.StatusOK {
2043
t.Errorf("status = %d, want 200", w.Code)
2144
}
22-
if !strings.Contains(w.Body.String(), "go_") {
23-
t.Error("expected Prometheus default Go metrics in response")
45+
if !strings.Contains(w.Body.String(), "imp_vm_state") {
46+
t.Errorf("expected imp_vm_state in output, got:\n%s", w.Body.String())
2447
}
2548
}
2649

27-
func TestVMMetrics_registered(t *testing.T) {
28-
collector := agent.NewVMMetricsCollector()
29-
collector.SetVMState("default/test-vm", "Running", "test-node")
30-
h := agent.NewMetricsHandlerWithCollector(collector)
50+
func TestVMMetrics_clearVMRemovesState(t *testing.T) {
51+
mc, h := newTestCollector(t)
52+
mc.SetVMState("default/test-vm", "Running", "test-node")
53+
mc.ClearVM("default/test-vm")
3154

3255
req := httptest.NewRequest(http.MethodGet, "/metrics", nil)
3356
w := httptest.NewRecorder()
3457
h.ServeHTTP(w, req)
3558

36-
if !strings.Contains(w.Body.String(), "imp_vm_state") {
37-
t.Errorf("expected imp_vm_state in metrics output, got:\n%s", w.Body.String())
59+
if strings.Contains(w.Body.String(), `impvm="test-vm"`) {
60+
t.Errorf("expected test-vm to be absent after ClearVM, got:\n%s", w.Body.String())
61+
}
62+
}
63+
64+
func TestVMMetrics_guestMetricsAppear(t *testing.T) {
65+
mc, h := newTestCollector(t)
66+
mc.SetGuestMetrics("default/test-vm", "test-node", "small", 0.5, 512*1024*1024, 1024*1024*1024)
67+
68+
req := httptest.NewRequest(http.MethodGet, "/metrics", nil)
69+
w := httptest.NewRecorder()
70+
h.ServeHTTP(w, req)
71+
72+
body := w.Body.String()
73+
for _, metric := range []string{
74+
"imp_vm_guest_cpu_usage_ratio",
75+
"imp_vm_guest_memory_used_bytes",
76+
"imp_vm_guest_disk_used_bytes",
77+
} {
78+
if !strings.Contains(body, metric) {
79+
t.Errorf("expected %s in output, got:\n%s", metric, body)
80+
}
3881
}
3982
}

0 commit comments

Comments
 (0)