diff --git a/auditing/async.go b/auditing/async.go index 0ffa3cb6..d225bf2f 100644 --- a/auditing/async.go +++ b/auditing/async.go @@ -5,6 +5,8 @@ import ( "fmt" "log/slog" "time" + + "github.com/metal-stack/metal-lib/pkg/healthstatus" ) const ( @@ -84,3 +86,11 @@ func (a *asyncAuditing) Index(entry Entry) error { func (a *asyncAuditing) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) { return a.a.Search(ctx, filter) } + +func (a *asyncAuditing) ServiceName() string { + return a.a.ServiceName() +} + +func (a *asyncAuditing) Check(ctx context.Context) (healthstatus.HealthResult, error) { + return a.a.Check(ctx) +} diff --git a/auditing/async_test.go b/auditing/async_test.go index d526182a..98159342 100644 --- a/auditing/async_test.go +++ b/auditing/async_test.go @@ -3,12 +3,14 @@ package auditing import ( "context" "errors" + "fmt" "log/slog" "os" "sync" "testing" "time" + "github.com/metal-stack/metal-lib/pkg/healthstatus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -110,10 +112,11 @@ func Test_asyncAuditing_Index(t *testing.T) { } type testBackend struct { - mutex sync.Mutex - done chan bool - count int - idxFn func(count int) error + mutex sync.Mutex + done chan bool + count int + idxFn func(count int) error + health error } func (t *testBackend) Index(e Entry) error { @@ -135,3 +138,18 @@ func (t *testBackend) Index(e Entry) error { func (t *testBackend) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) { panic("not required") } + +func (t *testBackend) ServiceName() string { + return "test" +} + +func (t *testBackend) Check(ctx context.Context) (healthstatus.HealthResult, error) { + if t.health != nil { + return healthstatus.HealthResult{}, fmt.Errorf("audit backend is unhealthy: %s", t.health.Error()) + } + + return healthstatus.HealthResult{ + Status: healthstatus.HealthStatusHealthy, + Message: "audit backend is healthy", + }, nil +} diff --git a/auditing/auditing.go b/auditing/auditing.go index 3eb5b228..41024381 100644 --- a/auditing/auditing.go +++ b/auditing/auditing.go @@ -6,6 +6,8 @@ import ( "os" "path/filepath" "time" + + "github.com/metal-stack/metal-lib/pkg/healthstatus" ) type Config struct { @@ -15,14 +17,6 @@ type Config struct { IndexTimeout time.Duration } -type Interval string - -var ( - HourlyInterval Interval = "@hourly" - DailyInterval Interval = "@daily" - MonthlyInterval Interval = "@monthly" -) - type EntryType string const ( @@ -135,6 +129,9 @@ type Auditing interface { // By default only recent entries will be returned. // The returned entries will be sorted by timestamp in descending order. Search(context.Context, EntryFilter) ([]Entry, error) + + // Implements the health check interface + healthstatus.HealthCheck } func defaultComponent() (string, error) { diff --git a/auditing/memory.go b/auditing/memory.go index 5c5b6d11..7ee0550e 100644 --- a/auditing/memory.go +++ b/auditing/memory.go @@ -10,6 +10,11 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/metal-stack/metal-lib/pkg/healthstatus" +) + +const ( + MemoryBackendName = "memory" ) type ( @@ -43,7 +48,7 @@ func NewMemory(c Config, mc MemoryConfig) (Auditing, error) { a := &memoryAuditing{ component: c.Component, - log: c.Log.WithGroup("auditing").With("audit-backend", "memory"), + log: c.Log.WithGroup("auditing").With("audit-backend", MemoryBackendName), memory: []Entry{}, config: &mc, } @@ -201,3 +206,14 @@ func (a *memoryAuditing) Search(ctx context.Context, filter EntryFilter) ([]Entr return entries, nil } + +func (a *memoryAuditing) ServiceName() string { + return MemoryBackendName +} + +func (a *memoryAuditing) Check(ctx context.Context) (healthstatus.HealthResult, error) { + return healthstatus.HealthResult{ + Message: fmt.Sprintf("audit backend %q is healthy", MemoryBackendName), + Status: healthstatus.HealthStatusHealthy, + }, nil +} diff --git a/auditing/memory_test.go b/auditing/memory_test.go index 72242b97..00d041b4 100644 --- a/auditing/memory_test.go +++ b/auditing/memory_test.go @@ -8,6 +8,8 @@ import ( "testing" "github.com/metal-stack/metal-lib/auditing" + "github.com/metal-stack/metal-lib/pkg/healthstatus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -31,4 +33,17 @@ func TestAuditing_Memory(t *testing.T) { tt.t(t, auditing) }) } + + auditing, err := auditing.NewMemory(auditing.Config{ + Log: slog.Default(), + }, auditing.MemoryConfig{}) + require.NoError(t, err) + + healthResult, err := auditing.Check(t.Context()) + + require.NoError(t, err) + assert.Equal(t, healthstatus.HealthResult{ + Status: healthstatus.HealthStatusHealthy, + Message: `audit backend "memory" is healthy`, + }, healthResult) } diff --git a/auditing/splunk.go b/auditing/splunk.go index 156ac629..cc44437e 100644 --- a/auditing/splunk.go +++ b/auditing/splunk.go @@ -7,12 +7,21 @@ import ( "encoding/json" "errors" "fmt" + "io" "log/slog" "net/http" "time" + + "github.com/metal-stack/metal-lib/pkg/healthstatus" ) -const spunkIndexTimeout = 5 * time.Second +const ( + SplunkBackendName = "splunk" + + spunkIndexTimeout = 5 * time.Second + // See on their docs in troubleshoot-http-event-collector (Possible_error_codes) + splunkHealthyCode = 17 +) type ( SplunkConfig struct { @@ -52,6 +61,12 @@ type ( // Event is the actual event data in whatever format you want: a string, a number, another JSON object, and so on. Event Entry `json:"event,omitempty"` } + + splunkRequestEndpoint struct { + path string + method string + body []byte + } ) // NewSplunk returns a new auditing backend for splunk. It supports the HTTP event collector interface. @@ -91,7 +106,7 @@ func NewSplunk(c Config, sc SplunkConfig) (Auditing, error) { a := &splunkAuditing{ component: c.Component, - log: c.Log.WithGroup("auditing").With("audit-backend", "splunk"), + log: c.Log.WithGroup("auditing").With("audit-backend", SplunkBackendName), indexTimeout: c.IndexTimeout, client: &http.Client{Transport: &http.Transport{TLSClientConfig: sc.TlsConfig}}, endpoint: endpoint, @@ -128,24 +143,84 @@ func (a *splunkAuditing) Index(entry Entry) error { ctx, cancel := context.WithTimeout(context.Background(), a.indexTimeout) defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodPost, a.endpoint+"/services/collector", bytes.NewBuffer(e)) + _, err = a.splunkRequest(ctx, splunkRequestEndpoint{ + path: "/services/collector", + method: http.MethodPost, + body: e, + }) if err != nil { - return err + return fmt.Errorf("error indexing audit entry in splunk: %w", err) + } + + return nil +} + +func (a *splunkAuditing) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) { + return nil, fmt.Errorf("search not implemented for splunk audit backend") +} + +func (a *splunkAuditing) ServiceName() string { + return SplunkBackendName +} + +func (a *splunkAuditing) Check(ctx context.Context) (healthstatus.HealthResult, error) { + resp, err := a.splunkRequest(ctx, splunkRequestEndpoint{ + path: "/services/collector/health", + method: http.MethodGet, + body: nil, + }) + if err != nil { + return healthstatus.HealthResult{}, fmt.Errorf("audit backend %q is unhealthy, collector is unhealthy: %w", SplunkBackendName, err) + } + + type healthResp struct { + Text string `json:"text"` + Code int `json:"code"` + } + + health := healthResp{} + + if err := json.Unmarshal(resp, &health); err != nil { + return healthstatus.HealthResult{}, fmt.Errorf("unable to unmarshal health response: %w", err) + } + + if health.Code != splunkHealthyCode { + return healthstatus.HealthResult{}, fmt.Errorf("audit backend %q is degraded: %s", SplunkBackendName, health.Text) + } + + return healthstatus.HealthResult{ + Message: fmt.Sprintf("audit backend %q is healthy: %s", SplunkBackendName, health.Text), + Status: healthstatus.HealthStatusHealthy, + }, nil +} + +func (a *splunkAuditing) splunkRequest(ctx context.Context, ep splunkRequestEndpoint) ([]byte, error) { + ctx, cancel := context.WithTimeout(ctx, a.indexTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, ep.method, a.endpoint+ep.path, bytes.NewBuffer(ep.body)) + if err != nil { + return nil, err } req.Header.Add("Authorization", "Splunk "+a.hecToken) resp, err := a.client.Do(req) if err != nil { - return fmt.Errorf("error indexing audit entry in splunk: %w", err) + return nil, fmt.Errorf("error during splunk request: %w", err) } defer func() { _ = resp.Body.Close() }() - return nil -} + bytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response: %w", err) + } -func (a *splunkAuditing) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) { - return nil, fmt.Errorf("search not implemented for splunk audit backend") + if code := resp.StatusCode; code >= http.StatusBadRequest { + return nil, fmt.Errorf("splunk endpoint %q did not return ok (%d)", ep.path, code) + } + + return bytes, nil } diff --git a/auditing/splunk_test.go b/auditing/splunk_test.go index 21f61ec0..5ac9116b 100644 --- a/auditing/splunk_test.go +++ b/auditing/splunk_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/metal-stack/metal-lib/pkg/healthstatus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -101,3 +102,44 @@ func Test_splunkAuditing_Index(t *testing.T) { }) } } + +func Test_splunkAuditing_Health(t *testing.T) { + tests := []struct { + name string + want healthstatus.HealthResult + }{ + { + name: "healthy", + want: healthstatus.HealthResult{ + Status: healthstatus.HealthStatusHealthy, + Message: `audit backend "splunk" is healthy: HEC is healthy`, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("/services/collector/health", func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"text":"HEC is healthy","code":17}`)) + w.WriteHeader(http.StatusOK) + }) + server := httptest.NewServer(mux) + defer server.Close() + + a, err := NewSplunk(Config{ + Component: "metal-lib", + Log: slog.Default(), + }, SplunkConfig{ + Endpoint: server.URL, + HECToken: "test-hec", + Index: "test-index", + Host: "test-host", + }) + require.NoError(t, err) + + got, err := a.Check(t.Context()) + require.NoError(t, err) + require.Equal(t, tt.want, got) + }) + } +} diff --git a/auditing/timescaledb.go b/auditing/timescaledb.go index 856f2034..ce7510fa 100644 --- a/auditing/timescaledb.go +++ b/auditing/timescaledb.go @@ -12,11 +12,16 @@ import ( "github.com/jmoiron/sqlx" "github.com/lopezator/migrator" + "github.com/metal-stack/metal-lib/pkg/healthstatus" _ "github.com/lib/pq" ) -const timescaleDbIndexTimeout = 2 * time.Second +const ( + TimescaleDbBackendName = "timescaledb" + + timescaleDbIndexTimeout = 2 * time.Second +) type ( TimescaleDbConfig struct { @@ -106,7 +111,7 @@ func NewTimescaleDB(c Config, tc TimescaleDbConfig) (Auditing, error) { a := ×caleAuditing{ component: c.Component, indexTimeout: c.IndexTimeout, - log: c.Log.WithGroup("auditing").With("audit-backend", "timescaledb"), + log: c.Log.WithGroup("auditing").With("audit-backend", TimescaleDbBackendName), db: db, config: &tc, } @@ -371,3 +376,18 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E return entries, nil } + +func (a *timescaleAuditing) ServiceName() string { + return TimescaleDbBackendName +} + +func (a *timescaleAuditing) Check(ctx context.Context) (healthstatus.HealthResult, error) { + if err := a.db.PingContext(ctx); err != nil { + return healthstatus.HealthResult{}, fmt.Errorf("audit backend %q is unhealthy, database is not reachable: %w", TimescaleDbBackendName, err) + } + + return healthstatus.HealthResult{ + Message: fmt.Sprintf("audit backend %q is healthy", TimescaleDbBackendName), + Status: healthstatus.HealthStatusHealthy, + }, nil +} diff --git a/auditing/timescaledb_integration_test.go b/auditing/timescaledb_integration_test.go index 40cf5fad..4b2c3299 100644 --- a/auditing/timescaledb_integration_test.go +++ b/auditing/timescaledb_integration_test.go @@ -10,6 +10,8 @@ import ( "github.com/jmoiron/sqlx" "github.com/metal-stack/metal-lib/auditing" + "github.com/metal-stack/metal-lib/pkg/healthstatus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" @@ -34,6 +36,14 @@ func TestAuditing_TimescaleDB(t *testing.T) { tt.t(t, aud) }) } + + healthResult, err := aud.Check(t.Context()) + + require.NoError(t, err) + assert.Equal(t, healthstatus.HealthResult{ + Status: healthstatus.HealthStatusHealthy, + Message: `audit backend "timescaledb" is healthy`, + }, healthResult) } func StartTimescaleDB(t testing.TB, config auditing.Config) (testcontainers.Container, *sqlx.DB, auditing.Auditing) {