Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions core/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,7 @@ func (b *Bgpipe) AttachStages() error {
})
}

if err := b.attachHTTPStages(); err != nil {
return err
}
b.attachHTTPStages()

return nil
}
Expand Down
19 changes: 10 additions & 9 deletions core/bgpipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ type Bgpipe struct {
Ctx context.Context
Cancel context.CancelCauseFunc

F *pflag.FlagSet // global flags
K *koanf.Koanf // global config
Pipe *pipe.Pipe // bgpfix pipe
Stages []*StageBase // pipe stages
HTTP *http.Server // optional shared HTTP server
StartTime time.Time // when the pipeline started

repo map[string]NewStage // maps cmd to new stage func
httpmux *chi.Mux // shared HTTP routes
F *pflag.FlagSet // global flags
K *koanf.Koanf // global config
Pipe *pipe.Pipe // bgpfix pipe
Stages []*StageBase // pipe stages
HTTP *http.Server // optional shared HTTP server
StartTime time.Time // when the pipeline started

repo map[string]NewStage // maps cmd to new stage func
httpmux *chi.Mux // shared HTTP routes
httppprof bool // true if pprof mounted on --http

wg_lwrite sync.WaitGroup // stages that write to pipe L
wg_lread sync.WaitGroup // stages that read from pipe L
Expand Down
4 changes: 3 additions & 1 deletion core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (b *Bgpipe) addFlags() {
f.BoolP("explain", "n", false, "print the pipeline as configured and quit")
f.StringP("log", "l", "info", "log level (debug/info/warn/error/disabled)")
f.String("http", "", "bind HTTP API + Prometheus /metrics to given address")
f.Bool("pprof", false, "enable pprof at /debug/pprof/ (requires --http)")
f.String("http-auth", "", "HTTP Basic Auth credentials (user:pass, $ENV, or /path)")
f.Bool("http-open", false, "disable HTTP authentication (dangerous)")
f.String("pprof", "", "enable pprof: 'http' to add to --http, or addr for separate server")
Comment on lines 66 to +69
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--pprof changed from a boolean to a string ("http" vs separate address). There are existing docs in the repo that describe --pprof as a boolean flag; consider updating documentation/help text wherever it references the old behavior to avoid user confusion.

Copilot uses AI. Check for mistakes.
f.StringSliceP("events", "e", []string{"PARSE", "ESTABLISHED", "EOR"}, "log given events (\"all\" means all events)")
f.StringSliceP("kill", "k", nil, "kill session on any of these events")
f.BoolP("stdin", "i", false, "read JSON from stdin")
Expand Down
183 changes: 145 additions & 38 deletions core/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package core
import (
"bytes"
"context"
"crypto/subtle"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"html"
"net"
"net/http"
"net/http/pprof"
"os"
"strings"
"time"

Expand All @@ -26,6 +29,37 @@ func (b *Bgpipe) configureHTTP() error {
}

m := chi.NewRouter()

// auth middleware (nil when no auth configured)
mw, err := b.httpAuthMiddleware()
if err != nil {
return err
}
if mw != nil {
m.Use(mw)
}

// fixed routes
m.Get("/metrics", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
vmmetrics.WritePrometheus(w, true)
})
m.Get("/hc", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"status": "ok",
"version": b.Version,
"stages": b.StageCount(),
"uptime": time.Since(b.StartTime).Truncate(time.Second).String(),
})
})
m.Get("/", b.httpDashboard)

// pprof
if err := b.configurePprof(m); err != nil {
return err
}

b.httpmux = m
b.HTTP = &http.Server{
Addr: addr,
Expand All @@ -36,6 +70,110 @@ func (b *Bgpipe) configureHTTP() error {
return nil
}

// httpAuthMiddleware returns middleware enforcing --http-auth.
// Returns nil if --http-open is set. Returns error if --http-auth is missing.
func (b *Bgpipe) httpAuthMiddleware() (func(http.Handler) http.Handler, error) {
if b.K.Bool("http-open") {
return nil, nil
}

authStr := strings.TrimSpace(b.K.String("http-auth"))
if authStr == "" {
return nil, fmt.Errorf("--http requires --http-auth (or --http-open to disable auth)")
}

cred, err := b.readCredential(authStr)
if err != nil {
return nil, fmt.Errorf("--http-auth: %w", err)
}
expected := []byte("Basic " + base64.StdEncoding.EncodeToString(cred))
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if subtle.ConstantTimeCompare([]byte(r.Header.Get("Authorization")), expected) != 1 {
w.Header().Set("WWW-Authenticate", `Basic realm="bgpipe"`)
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
Comment on lines +80 to +95
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

httpAuthMiddleware accepts --http-auth values that don’t look like Basic Auth credentials (e.g., missing user:pass, empty $ENV), which can silently misconfigure the API into being effectively inaccessible. Consider validating that the resolved credential is non-empty and contains a : (and possibly trimming whitespace) before building the expected Authorization header.

Copilot uses AI. Check for mistakes.
}
next.ServeHTTP(w, r)
})
}, nil
}

// readCredential reads a credential from a string, $ENV variable, or /path.
// The resolved value must be non-empty and contain a colon (user:pass).
func (b *Bgpipe) readCredential(v string) ([]byte, error) {
var cred []byte
switch {
case len(v) > 1 && v[0] == '$':
val := os.Getenv(v[1:])
if val == "" {
return nil, fmt.Errorf("environment variable %s is empty or unset", v)
}
cred = bytes.TrimSpace([]byte(val))
case len(v) > 0 && v[0] == '/':
fh, err := os.Open(v)
if err != nil {
return nil, err
}
buf := make([]byte, 128)
n, err := fh.Read(buf)
fh.Close()
if err != nil {
return nil, fmt.Errorf("file %s: %w", v, err)
}
line, _, _ := bytes.Cut(buf[:n], []byte{'\n'})
Comment on lines +114 to +124
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading credentials from a file treats io.EOF as a hard error. For typical small credential files, the first Read will often return (n>0, err=io.EOF), causing --http-auth file mode to fail. Treat io.EOF as success when n>0 (or use io.ReadAll / bufio.Reader ReadString('\n') and always close via defer).

Suggested change
fh, err := os.Open(v)
if err != nil {
return nil, err
}
buf := make([]byte, 128)
n, err := fh.Read(buf)
fh.Close()
if err != nil {
return nil, fmt.Errorf("file %s: %w", v, err)
}
line, _, _ := bytes.Cut(buf[:n], []byte{'\n'})
buf, err := os.ReadFile(v)
if err != nil {
return nil, fmt.Errorf("file %s: %w", v, err)
}
line, _, _ := bytes.Cut(buf, []byte{'\n'})

Copilot uses AI. Check for mistakes.
cred = bytes.TrimSpace(line)
default:
cred = []byte(v)
}
if len(cred) == 0 {
return nil, fmt.Errorf("credential is empty")
}
if !bytes.Contains(cred, []byte{':'}) {
return nil, fmt.Errorf("credential must be in user:pass format")
}
return cred, nil
}

func (b *Bgpipe) configurePprof(m *chi.Mux) error {
pprofVal := strings.TrimSpace(b.K.String("pprof"))
if pprofVal == "" {
return nil
}

// separate pprof server? overwrite m with a fresh mux
if pprofVal != "http" {
m = chi.NewMux()
}

m.HandleFunc("/debug/pprof/*", pprof.Index)
m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
m.HandleFunc("/debug/pprof/profile", pprof.Profile)
m.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
m.HandleFunc("/debug/pprof/trace", pprof.Trace)

if pprofVal == "http" {
b.httppprof = true
return nil
}

// start separate pprof server (no auth)
ln, err := net.Listen("tcp", pprofVal)
if err != nil {
return fmt.Errorf("could not bind --pprof %s: %w", pprofVal, err)
}

go func() {
srv := &http.Server{Handler: m, ReadHeaderTimeout: 5 * time.Second}
if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
b.Warn().Err(err).Msg("pprof server error")
}
}()

b.Info().Msgf("pprof: http://%s/debug/pprof/", ln.Addr())
return nil
}

func (b *Bgpipe) startHTTP() error {
if b.HTTP == nil {
return nil
Expand All @@ -54,7 +192,7 @@ func (b *Bgpipe) startHTTP() error {
b.Cancel(fmt.Errorf("http server failed: %w", err))
}()

b.Info().Str("addr", ln.Addr().String()).Msg("HTTP API listening")
b.Info().Msgf("HTTP API: http://%s/", ln.Addr())
return nil
}

Expand All @@ -70,23 +208,24 @@ func (b *Bgpipe) stopHTTP() {
}
}

func (b *Bgpipe) attachHTTPStages() error {
// attachHTTPStages mounts per-stage HTTP routes on the shared mux.
func (b *Bgpipe) attachHTTPStages() {
if b.httpmux == nil {
return nil
return
}

m := b.httpmux
used := make(map[string]struct{})

// mount per-stage routes
for _, s := range b.Stages {
if s == nil {
continue
}

r := chi.NewRouter()
if err := s.Stage.RouteHTTP(r); err != nil {
return s.Errorf("could not register HTTP API: %w", err)
s.Warn().Err(err).Msg("could not register HTTP API")
continue
}
Comment on lines 225 to 229
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attachHTTPStages used to fail the pipeline setup if a stage’s RouteHTTP returned an error; it now only logs a warning and continues. If stage HTTP endpoints are expected to be reliable/mandatory, this can hide misconfigurations until runtime. Consider either keeping the original fail-fast behavior, or making the choice explicit via a flag/config so operators can decide.

Copilot uses AI. Check for mistakes.
if len(r.Routes()) == 0 {
continue
Expand All @@ -103,38 +242,6 @@ func (b *Bgpipe) attachHTTPStages() error {

s.Info().Str("http", s.HTTPPath).Msg("stage HTTP API mounted")
}

// GET /metrics — Prometheus
m.Get("/metrics", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
vmmetrics.WritePrometheus(w, true)
})

// GET /hc — k8s health check
m.Get("/hc", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"status": "ok",
"version": b.Version,
"stages": b.StageCount(),
"uptime": time.Since(b.StartTime).Truncate(time.Second).String(),
})
})

// GET / — web dashboard
m.Get("/", b.httpDashboard)

// pprof?
if b.K.Bool("pprof") {
m.HandleFunc("/debug/pprof/", pprof.Index)
m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
m.HandleFunc("/debug/pprof/profile", pprof.Profile)
m.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
m.HandleFunc("/debug/pprof/trace", pprof.Trace)
b.Info().Msg("pprof enabled at /debug/pprof/")
}

return nil
}

func (b *Bgpipe) httpDashboard(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -248,7 +355,7 @@ func (b *Bgpipe) httpDashboard(w http.ResponseWriter, r *http.Request) {
<a href="/metrics">Prometheus Metrics</a>
<a href="/hc">Health Check</a>`)

if b.K.Bool("pprof") {
if b.httppprof {
fmt.Fprintf(&buf, `
<a href="/debug/pprof/">pprof</a>`)
}
Expand Down
Loading
Loading