Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions core/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,7 @@ func (b *Bgpipe) AttachStages() error {
})
}

if err := b.attachHTTPStages(); err != nil {
return err
}
b.attachHTTPStages()

return nil
}
Expand Down
19 changes: 10 additions & 9 deletions core/bgpipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ type Bgpipe struct {
Ctx context.Context
Cancel context.CancelCauseFunc

F *pflag.FlagSet // global flags
K *koanf.Koanf // global config
Pipe *pipe.Pipe // bgpfix pipe
Stages []*StageBase // pipe stages
HTTP *http.Server // optional shared HTTP server
StartTime time.Time // when the pipeline started

repo map[string]NewStage // maps cmd to new stage func
httpmux *chi.Mux // shared HTTP routes
F *pflag.FlagSet // global flags
K *koanf.Koanf // global config
Pipe *pipe.Pipe // bgpfix pipe
Stages []*StageBase // pipe stages
HTTP *http.Server // optional shared HTTP server
StartTime time.Time // when the pipeline started

repo map[string]NewStage // maps cmd to new stage func
httpmux *chi.Mux // shared HTTP routes
httppprof bool // true if pprof mounted on --http

wg_lwrite sync.WaitGroup // stages that write to pipe L
wg_lread sync.WaitGroup // stages that read from pipe L
Expand Down
4 changes: 3 additions & 1 deletion core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (b *Bgpipe) addFlags() {
f.BoolP("explain", "n", false, "print the pipeline as configured and quit")
f.StringP("log", "l", "info", "log level (debug/info/warn/error/disabled)")
f.String("http", "", "bind HTTP API + Prometheus /metrics to given address")
f.Bool("pprof", false, "enable pprof at /debug/pprof/ (requires --http)")
f.String("http-auth", "", "HTTP Basic Auth credentials (user:pass, $ENV, or /path)")
f.Bool("http-open", false, "disable HTTP authentication (dangerous)")
f.String("pprof", "", "enable pprof: 'http' to add to --http, or addr for separate server")
f.StringSliceP("events", "e", []string{"PARSE", "ESTABLISHED", "EOR"}, "log given events (\"all\" means all events)")
f.StringSliceP("kill", "k", nil, "kill session on any of these events")
f.BoolP("stdin", "i", false, "read JSON from stdin")
Expand Down
177 changes: 139 additions & 38 deletions core/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package core
import (
"bytes"
"context"
"crypto/subtle"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"html"
"net"
"net/http"
"net/http/pprof"
"os"
"strings"
"time"

Expand All @@ -26,6 +29,37 @@ func (b *Bgpipe) configureHTTP() error {
}

m := chi.NewRouter()

// auth middleware (nil when no auth configured)
mw, err := b.httpAuthMiddleware()
if err != nil {
return err
}
if mw != nil {
m.Use(mw)
}

// fixed routes
m.Get("/metrics", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
vmmetrics.WritePrometheus(w, true)
})
m.Get("/hc", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"status": "ok",
"version": b.Version,
"stages": b.StageCount(),
"uptime": time.Since(b.StartTime).Truncate(time.Second).String(),
})
})
m.Get("/", b.httpDashboard)

// pprof
if err := b.configurePprof(m); err != nil {
return err
}

b.httpmux = m
b.HTTP = &http.Server{
Addr: addr,
Expand All @@ -36,6 +70,104 @@ func (b *Bgpipe) configureHTTP() error {
return nil
}

// httpAuthMiddleware returns middleware enforcing --http-auth.
// Returns nil if --http-open is set. Returns error if --http-auth is missing.
func (b *Bgpipe) httpAuthMiddleware() (func(http.Handler) http.Handler, error) {
if b.K.Bool("http-open") {
return nil, nil
}

authStr := strings.TrimSpace(b.K.String("http-auth"))
if authStr == "" {
return nil, fmt.Errorf("--http requires --http-auth (or --http-open to disable auth)")
}

cred, err := b.readCredential(authStr)
if err != nil {
return nil, fmt.Errorf("--http-auth: %w", err)
}
expected := []byte("Basic " + base64.StdEncoding.EncodeToString(cred))
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if subtle.ConstantTimeCompare([]byte(r.Header.Get("Authorization")), expected) != 1 {
w.Header().Set("WWW-Authenticate", `Basic realm="bgpipe"`)
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
next.ServeHTTP(w, r)
})
}, nil
}

// readCredential reads a credential from a string, $ENV variable, or /path.
// The resolved value must be non-empty and contain a colon (user:pass).
func (b *Bgpipe) readCredential(v string) ([]byte, error) {
var cred []byte
switch {
case len(v) > 1 && v[0] == '$':
val := os.Getenv(v[1:])
if val == "" {
return nil, fmt.Errorf("environment variable %s is empty or unset", v)
}
cred = bytes.TrimSpace([]byte(val))
case len(v) > 0 && v[0] == '/':
buf, err := os.ReadFile(v)
if err != nil {
return nil, fmt.Errorf("file %s: %w", v, err)
}
line, _, _ := bytes.Cut(buf, []byte{'\n'})
cred = bytes.TrimSpace(line)
default:
cred = []byte(v)
}
if len(cred) == 0 {
return nil, fmt.Errorf("credential is empty")
}
if !bytes.Contains(cred, []byte{':'}) {
return nil, fmt.Errorf("credential must be in user:pass format")
}
return cred, nil
}

func (b *Bgpipe) configurePprof(m *chi.Mux) error {
pprofVal := strings.TrimSpace(b.K.String("pprof"))
if pprofVal == "" {
return nil
}

// separate pprof server? overwrite m with a fresh mux
if pprofVal != "http" {
m = chi.NewMux()
}

m.HandleFunc("/debug/pprof/*", pprof.Index)
m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
m.HandleFunc("/debug/pprof/profile", pprof.Profile)
m.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
m.HandleFunc("/debug/pprof/trace", pprof.Trace)

if pprofVal == "http" {
b.httppprof = true
return nil
}

// start separate pprof server (no auth)
ln, err := net.Listen("tcp", pprofVal)
if err != nil {
return fmt.Errorf("could not bind --pprof %s: %w", pprofVal, err)
}

go func() {
srv := &http.Server{Handler: m, ReadHeaderTimeout: 5 * time.Second}
if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
b.Warn().Err(err).Msg("pprof server error")
}
}()

b.Info().Msgf("pprof: http://%s/debug/pprof/", ln.Addr())
return nil
}

func (b *Bgpipe) startHTTP() error {
if b.HTTP == nil {
return nil
Expand All @@ -54,7 +186,7 @@ func (b *Bgpipe) startHTTP() error {
b.Cancel(fmt.Errorf("http server failed: %w", err))
}()

b.Info().Str("addr", ln.Addr().String()).Msg("HTTP API listening")
b.Info().Msgf("HTTP API: http://%s/", ln.Addr())
return nil
}

Expand All @@ -70,23 +202,24 @@ func (b *Bgpipe) stopHTTP() {
}
}

func (b *Bgpipe) attachHTTPStages() error {
// attachHTTPStages mounts per-stage HTTP routes on the shared mux.
func (b *Bgpipe) attachHTTPStages() {
if b.httpmux == nil {
return nil
return
}

m := b.httpmux
used := make(map[string]struct{})

// mount per-stage routes
for _, s := range b.Stages {
if s == nil {
continue
}

r := chi.NewRouter()
if err := s.Stage.RouteHTTP(r); err != nil {
return s.Errorf("could not register HTTP API: %w", err)
s.Warn().Err(err).Msg("could not register HTTP API")
continue
}
if len(r.Routes()) == 0 {
continue
Expand All @@ -103,38 +236,6 @@ func (b *Bgpipe) attachHTTPStages() error {

s.Info().Str("http", s.HTTPPath).Msg("stage HTTP API mounted")
}

// GET /metrics — Prometheus
m.Get("/metrics", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
vmmetrics.WritePrometheus(w, true)
})

// GET /hc — k8s health check
m.Get("/hc", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"status": "ok",
"version": b.Version,
"stages": b.StageCount(),
"uptime": time.Since(b.StartTime).Truncate(time.Second).String(),
})
})

// GET / — web dashboard
m.Get("/", b.httpDashboard)

// pprof?
if b.K.Bool("pprof") {
m.HandleFunc("/debug/pprof/", pprof.Index)
m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
m.HandleFunc("/debug/pprof/profile", pprof.Profile)
m.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
m.HandleFunc("/debug/pprof/trace", pprof.Trace)
b.Info().Msg("pprof enabled at /debug/pprof/")
}

return nil
}

func (b *Bgpipe) httpDashboard(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -248,7 +349,7 @@ func (b *Bgpipe) httpDashboard(w http.ResponseWriter, r *http.Request) {
<a href="/metrics">Prometheus Metrics</a>
<a href="/hc">Health Check</a>`)

if b.K.Bool("pprof") {
if b.httppprof {
fmt.Fprintf(&buf, `
<a href="/debug/pprof/">pprof</a>`)
}
Expand Down
Loading
Loading