Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions auditing/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"log/slog"
"time"

"github.com/metal-stack/metal-lib/pkg/healthstatus"
)

const (
Expand Down Expand Up @@ -84,3 +86,11 @@ func (a *asyncAuditing) Index(entry Entry) error {
func (a *asyncAuditing) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) {
return a.a.Search(ctx, filter)
}

func (a *asyncAuditing) ServiceName() string {
return a.a.ServiceName()
}

func (a *asyncAuditing) Check(ctx context.Context) (healthstatus.HealthResult, error) {
return a.a.Check(ctx)
}
26 changes: 22 additions & 4 deletions auditing/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package auditing
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"sync"
"testing"
"time"

"github.com/metal-stack/metal-lib/pkg/healthstatus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -110,10 +112,11 @@ func Test_asyncAuditing_Index(t *testing.T) {
}

type testBackend struct {
mutex sync.Mutex
done chan bool
count int
idxFn func(count int) error
mutex sync.Mutex
done chan bool
count int
idxFn func(count int) error
health error
}

func (t *testBackend) Index(e Entry) error {
Expand All @@ -135,3 +138,18 @@ func (t *testBackend) Index(e Entry) error {
func (t *testBackend) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) {
panic("not required")
}

func (t *testBackend) ServiceName() string {
return "test"
}

func (t *testBackend) Check(ctx context.Context) (healthstatus.HealthResult, error) {
if t.health != nil {
return healthstatus.HealthResult{}, fmt.Errorf("audit backend is unhealthy: %s", t.health.Error())
}

return healthstatus.HealthResult{
Status: healthstatus.HealthStatusHealthy,
Message: "audit backend is healthy",
}, nil
}
13 changes: 5 additions & 8 deletions auditing/auditing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"os"
"path/filepath"
"time"

"github.com/metal-stack/metal-lib/pkg/healthstatus"
)

type Config struct {
Expand All @@ -15,14 +17,6 @@ type Config struct {
IndexTimeout time.Duration
}

type Interval string

var (
HourlyInterval Interval = "@hourly"
DailyInterval Interval = "@daily"
MonthlyInterval Interval = "@monthly"
)

type EntryType string

const (
Expand Down Expand Up @@ -135,6 +129,9 @@ type Auditing interface {
// By default only recent entries will be returned.
// The returned entries will be sorted by timestamp in descending order.
Search(context.Context, EntryFilter) ([]Entry, error)

// Implements the health check interface
healthstatus.HealthCheck
}

func defaultComponent() (string, error) {
Expand Down
18 changes: 17 additions & 1 deletion auditing/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/metal-stack/metal-lib/pkg/healthstatus"
)

const (
MemoryBackendName = "memory"
)

type (
Expand Down Expand Up @@ -43,7 +48,7 @@ func NewMemory(c Config, mc MemoryConfig) (Auditing, error) {

a := &memoryAuditing{
component: c.Component,
log: c.Log.WithGroup("auditing").With("audit-backend", "memory"),
log: c.Log.WithGroup("auditing").With("audit-backend", MemoryBackendName),
memory: []Entry{},
config: &mc,
}
Expand Down Expand Up @@ -201,3 +206,14 @@ func (a *memoryAuditing) Search(ctx context.Context, filter EntryFilter) ([]Entr

return entries, nil
}

func (a *memoryAuditing) ServiceName() string {
return MemoryBackendName
}

func (a *memoryAuditing) Check(ctx context.Context) (healthstatus.HealthResult, error) {
return healthstatus.HealthResult{
Message: fmt.Sprintf("audit backend %q is healthy", MemoryBackendName),
Status: healthstatus.HealthStatusHealthy,
}, nil
}
15 changes: 15 additions & 0 deletions auditing/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"

"github.com/metal-stack/metal-lib/auditing"
"github.com/metal-stack/metal-lib/pkg/healthstatus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -31,4 +33,17 @@ func TestAuditing_Memory(t *testing.T) {
tt.t(t, auditing)
})
}

auditing, err := auditing.NewMemory(auditing.Config{
Log: slog.Default(),
}, auditing.MemoryConfig{})
require.NoError(t, err)

healthResult, err := auditing.Check(t.Context())

require.NoError(t, err)
assert.Equal(t, healthstatus.HealthResult{
Status: healthstatus.HealthStatusHealthy,
Message: `audit backend "memory" is healthy`,
}, healthResult)
}
93 changes: 84 additions & 9 deletions auditing/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,21 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"time"

"github.com/metal-stack/metal-lib/pkg/healthstatus"
)

const spunkIndexTimeout = 5 * time.Second
const (
SplunkBackendName = "splunk"

spunkIndexTimeout = 5 * time.Second
// See on their docs in troubleshoot-http-event-collector (Possible_error_codes)
splunkHealthyCode = 17
)

type (
SplunkConfig struct {
Expand Down Expand Up @@ -52,6 +61,12 @@ type (
// Event is the actual event data in whatever format you want: a string, a number, another JSON object, and so on.
Event Entry `json:"event,omitempty"`
}

splunkRequestEndpoint struct {
path string
method string
body []byte
}
)

// NewSplunk returns a new auditing backend for splunk. It supports the HTTP event collector interface.
Expand Down Expand Up @@ -91,7 +106,7 @@ func NewSplunk(c Config, sc SplunkConfig) (Auditing, error) {

a := &splunkAuditing{
component: c.Component,
log: c.Log.WithGroup("auditing").With("audit-backend", "splunk"),
log: c.Log.WithGroup("auditing").With("audit-backend", SplunkBackendName),
indexTimeout: c.IndexTimeout,
client: &http.Client{Transport: &http.Transport{TLSClientConfig: sc.TlsConfig}},
endpoint: endpoint,
Expand Down Expand Up @@ -128,24 +143,84 @@ func (a *splunkAuditing) Index(entry Entry) error {
ctx, cancel := context.WithTimeout(context.Background(), a.indexTimeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, a.endpoint+"/services/collector", bytes.NewBuffer(e))
_, err = a.splunkRequest(ctx, splunkRequestEndpoint{
path: "/services/collector",
method: http.MethodPost,
body: e,
})
if err != nil {
return err
return fmt.Errorf("error indexing audit entry in splunk: %w", err)
}

return nil
}

func (a *splunkAuditing) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) {
return nil, fmt.Errorf("search not implemented for splunk audit backend")
}

func (a *splunkAuditing) ServiceName() string {
return SplunkBackendName
}

func (a *splunkAuditing) Check(ctx context.Context) (healthstatus.HealthResult, error) {
resp, err := a.splunkRequest(ctx, splunkRequestEndpoint{
path: "/services/collector/health",
method: http.MethodGet,
body: nil,
})
if err != nil {
return healthstatus.HealthResult{}, fmt.Errorf("audit backend %q is unhealthy, collector is unhealthy: %w", SplunkBackendName, err)
}

type healthResp struct {
Text string `json:"text"`
Code int `json:"code"`
}

health := healthResp{}

if err := json.Unmarshal(resp, &health); err != nil {
return healthstatus.HealthResult{}, fmt.Errorf("unable to unmarshal health response: %w", err)
}

if health.Code != splunkHealthyCode {
return healthstatus.HealthResult{}, fmt.Errorf("audit backend %q is degraded: %s", SplunkBackendName, health.Text)
}

return healthstatus.HealthResult{
Message: fmt.Sprintf("audit backend %q is healthy: %s", SplunkBackendName, health.Text),
Status: healthstatus.HealthStatusHealthy,
}, nil
}

func (a *splunkAuditing) splunkRequest(ctx context.Context, ep splunkRequestEndpoint) ([]byte, error) {
ctx, cancel := context.WithTimeout(ctx, a.indexTimeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, ep.method, a.endpoint+ep.path, bytes.NewBuffer(ep.body))
if err != nil {
return nil, err
}

req.Header.Add("Authorization", "Splunk "+a.hecToken)

resp, err := a.client.Do(req)
if err != nil {
return fmt.Errorf("error indexing audit entry in splunk: %w", err)
return nil, fmt.Errorf("error during splunk request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()

return nil
}
bytes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response: %w", err)
}

func (a *splunkAuditing) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) {
return nil, fmt.Errorf("search not implemented for splunk audit backend")
if code := resp.StatusCode; code >= http.StatusBadRequest {
return nil, fmt.Errorf("splunk endpoint %q did not return ok (%d)", ep.path, code)
}

return bytes, nil
}
42 changes: 42 additions & 0 deletions auditing/splunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/metal-stack/metal-lib/pkg/healthstatus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -101,3 +102,44 @@ func Test_splunkAuditing_Index(t *testing.T) {
})
}
}

func Test_splunkAuditing_Health(t *testing.T) {
tests := []struct {
name string
want healthstatus.HealthResult
}{
{
name: "healthy",
want: healthstatus.HealthResult{
Status: healthstatus.HealthStatusHealthy,
Message: `audit backend "splunk" is healthy: HEC is healthy`,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc("/services/collector/health", func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"text":"HEC is healthy","code":17}`))
w.WriteHeader(http.StatusOK)
})
server := httptest.NewServer(mux)
defer server.Close()

a, err := NewSplunk(Config{
Component: "metal-lib",
Log: slog.Default(),
}, SplunkConfig{
Endpoint: server.URL,
HECToken: "test-hec",
Index: "test-index",
Host: "test-host",
})
require.NoError(t, err)

got, err := a.Check(t.Context())
require.NoError(t, err)
require.Equal(t, tt.want, got)
})
}
}
Loading