diff --git a/core/attach.go b/core/attach.go index 5fa9baa..bcf3db6 100644 --- a/core/attach.go +++ b/core/attach.go @@ -124,9 +124,7 @@ func (b *Bgpipe) AttachStages() error { }) } - if err := b.attachHTTPStages(); err != nil { - return err - } + b.attachHTTPStages() return nil } diff --git a/core/bgpipe.go b/core/bgpipe.go index b68672a..0db5560 100644 --- a/core/bgpipe.go +++ b/core/bgpipe.go @@ -32,15 +32,16 @@ type Bgpipe struct { Ctx context.Context Cancel context.CancelCauseFunc - F *pflag.FlagSet // global flags - K *koanf.Koanf // global config - Pipe *pipe.Pipe // bgpfix pipe - Stages []*StageBase // pipe stages - HTTP *http.Server // optional shared HTTP server - StartTime time.Time // when the pipeline started - - repo map[string]NewStage // maps cmd to new stage func - httpmux *chi.Mux // shared HTTP routes + F *pflag.FlagSet // global flags + K *koanf.Koanf // global config + Pipe *pipe.Pipe // bgpfix pipe + Stages []*StageBase // pipe stages + HTTP *http.Server // optional shared HTTP server + StartTime time.Time // when the pipeline started + + repo map[string]NewStage // maps cmd to new stage func + httpmux *chi.Mux // shared HTTP routes + httppprof bool // true if pprof mounted on --http wg_lwrite sync.WaitGroup // stages that write to pipe L wg_lread sync.WaitGroup // stages that read from pipe L diff --git a/core/config.go b/core/config.go index edb7554..9414241 100644 --- a/core/config.go +++ b/core/config.go @@ -64,7 +64,9 @@ func (b *Bgpipe) addFlags() { f.BoolP("explain", "n", false, "print the pipeline as configured and quit") f.StringP("log", "l", "info", "log level (debug/info/warn/error/disabled)") f.String("http", "", "bind HTTP API + Prometheus /metrics to given address") - f.Bool("pprof", false, "enable pprof at /debug/pprof/ (requires --http)") + f.String("http-auth", "", "HTTP Basic Auth credentials (user:pass, $ENV, or /path)") + f.Bool("http-open", false, "disable HTTP authentication (dangerous)") + f.String("pprof", "", "enable pprof: 'http' to add to --http, or addr for separate server") f.StringSliceP("events", "e", []string{"PARSE", "ESTABLISHED", "EOR"}, "log given events (\"all\" means all events)") f.StringSliceP("kill", "k", nil, "kill session on any of these events") f.BoolP("stdin", "i", false, "read JSON from stdin") diff --git a/core/http.go b/core/http.go index e85b12b..af75b0c 100644 --- a/core/http.go +++ b/core/http.go @@ -3,6 +3,8 @@ package core import ( "bytes" "context" + "crypto/subtle" + "encoding/base64" "encoding/json" "errors" "fmt" @@ -10,6 +12,7 @@ import ( "net" "net/http" "net/http/pprof" + "os" "strings" "time" @@ -26,6 +29,37 @@ func (b *Bgpipe) configureHTTP() error { } m := chi.NewRouter() + + // auth middleware (nil when no auth configured) + mw, err := b.httpAuthMiddleware() + if err != nil { + return err + } + if mw != nil { + m.Use(mw) + } + + // fixed routes + m.Get("/metrics", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + vmmetrics.WritePrometheus(w, true) + }) + m.Get("/hc", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]any{ + "status": "ok", + "version": b.Version, + "stages": b.StageCount(), + "uptime": time.Since(b.StartTime).Truncate(time.Second).String(), + }) + }) + m.Get("/", b.httpDashboard) + + // pprof + if err := b.configurePprof(m); err != nil { + return err + } + b.httpmux = m b.HTTP = &http.Server{ Addr: addr, @@ -36,6 +70,110 @@ func (b *Bgpipe) configureHTTP() error { return nil } +// httpAuthMiddleware returns middleware enforcing --http-auth. +// Returns nil if --http-open is set. Returns error if --http-auth is missing. +func (b *Bgpipe) httpAuthMiddleware() (func(http.Handler) http.Handler, error) { + if b.K.Bool("http-open") { + return nil, nil + } + + authStr := strings.TrimSpace(b.K.String("http-auth")) + if authStr == "" { + return nil, fmt.Errorf("--http requires --http-auth (or --http-open to disable auth)") + } + + cred, err := b.readCredential(authStr) + if err != nil { + return nil, fmt.Errorf("--http-auth: %w", err) + } + expected := []byte("Basic " + base64.StdEncoding.EncodeToString(cred)) + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if subtle.ConstantTimeCompare([]byte(r.Header.Get("Authorization")), expected) != 1 { + w.Header().Set("WWW-Authenticate", `Basic realm="bgpipe"`) + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + next.ServeHTTP(w, r) + }) + }, nil +} + +// readCredential reads a credential from a string, $ENV variable, or /path. +// The resolved value must be non-empty and contain a colon (user:pass). +func (b *Bgpipe) readCredential(v string) ([]byte, error) { + var cred []byte + switch { + case len(v) > 1 && v[0] == '$': + val := os.Getenv(v[1:]) + if val == "" { + return nil, fmt.Errorf("environment variable %s is empty or unset", v) + } + cred = bytes.TrimSpace([]byte(val)) + case len(v) > 0 && v[0] == '/': + fh, err := os.Open(v) + if err != nil { + return nil, err + } + buf := make([]byte, 128) + n, err := fh.Read(buf) + fh.Close() + if err != nil { + return nil, fmt.Errorf("file %s: %w", v, err) + } + line, _, _ := bytes.Cut(buf[:n], []byte{'\n'}) + cred = bytes.TrimSpace(line) + default: + cred = []byte(v) + } + if len(cred) == 0 { + return nil, fmt.Errorf("credential is empty") + } + if !bytes.Contains(cred, []byte{':'}) { + return nil, fmt.Errorf("credential must be in user:pass format") + } + return cred, nil +} + +func (b *Bgpipe) configurePprof(m *chi.Mux) error { + pprofVal := strings.TrimSpace(b.K.String("pprof")) + if pprofVal == "" { + return nil + } + + // separate pprof server? overwrite m with a fresh mux + if pprofVal != "http" { + m = chi.NewMux() + } + + m.HandleFunc("/debug/pprof/*", pprof.Index) + m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + m.HandleFunc("/debug/pprof/profile", pprof.Profile) + m.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + m.HandleFunc("/debug/pprof/trace", pprof.Trace) + + if pprofVal == "http" { + b.httppprof = true + return nil + } + + // start separate pprof server (no auth) + ln, err := net.Listen("tcp", pprofVal) + if err != nil { + return fmt.Errorf("could not bind --pprof %s: %w", pprofVal, err) + } + + go func() { + srv := &http.Server{Handler: m, ReadHeaderTimeout: 5 * time.Second} + if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) { + b.Warn().Err(err).Msg("pprof server error") + } + }() + + b.Info().Msgf("pprof: http://%s/debug/pprof/", ln.Addr()) + return nil +} + func (b *Bgpipe) startHTTP() error { if b.HTTP == nil { return nil @@ -54,7 +192,7 @@ func (b *Bgpipe) startHTTP() error { b.Cancel(fmt.Errorf("http server failed: %w", err)) }() - b.Info().Str("addr", ln.Addr().String()).Msg("HTTP API listening") + b.Info().Msgf("HTTP API: http://%s/", ln.Addr()) return nil } @@ -70,15 +208,15 @@ func (b *Bgpipe) stopHTTP() { } } -func (b *Bgpipe) attachHTTPStages() error { +// attachHTTPStages mounts per-stage HTTP routes on the shared mux. +func (b *Bgpipe) attachHTTPStages() { if b.httpmux == nil { - return nil + return } m := b.httpmux used := make(map[string]struct{}) - // mount per-stage routes for _, s := range b.Stages { if s == nil { continue @@ -86,7 +224,8 @@ func (b *Bgpipe) attachHTTPStages() error { r := chi.NewRouter() if err := s.Stage.RouteHTTP(r); err != nil { - return s.Errorf("could not register HTTP API: %w", err) + s.Warn().Err(err).Msg("could not register HTTP API") + continue } if len(r.Routes()) == 0 { continue @@ -103,38 +242,6 @@ func (b *Bgpipe) attachHTTPStages() error { s.Info().Str("http", s.HTTPPath).Msg("stage HTTP API mounted") } - - // GET /metrics — Prometheus - m.Get("/metrics", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") - vmmetrics.WritePrometheus(w, true) - }) - - // GET /hc — k8s health check - m.Get("/hc", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(map[string]any{ - "status": "ok", - "version": b.Version, - "stages": b.StageCount(), - "uptime": time.Since(b.StartTime).Truncate(time.Second).String(), - }) - }) - - // GET / — web dashboard - m.Get("/", b.httpDashboard) - - // pprof? - if b.K.Bool("pprof") { - m.HandleFunc("/debug/pprof/", pprof.Index) - m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - m.HandleFunc("/debug/pprof/profile", pprof.Profile) - m.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - m.HandleFunc("/debug/pprof/trace", pprof.Trace) - b.Info().Msg("pprof enabled at /debug/pprof/") - } - - return nil } func (b *Bgpipe) httpDashboard(w http.ResponseWriter, r *http.Request) { @@ -248,7 +355,7 @@ func (b *Bgpipe) httpDashboard(w http.ResponseWriter, r *http.Request) { Prometheus Metrics Health Check`) - if b.K.Bool("pprof") { + if b.httppprof { fmt.Fprintf(&buf, ` pprof`) } diff --git a/docs/stages/rpki.md b/docs/stages/rpki.md index 4bce175..26645ed 100644 --- a/docs/stages/rpki.md +++ b/docs/stages/rpki.md @@ -1,6 +1,6 @@ # rpki -Validate UPDATE messages using RPKI. +Validate UPDATE messages using RPKI (ROV + ASPA). ## Synopsis @@ -11,40 +11,86 @@ bgpipe [...] -- rpki [OPTIONS] ## Description The **rpki** stage validates BGP UPDATE messages against RPKI (Resource Public -Key Infrastructure) data. It checks whether the origin AS is authorized to -announce each prefix, based on ROA (Route Origin Authorization) records. +Key Infrastructure) data. It performs ROV by default, and optionally ASPA when +`--aspa` is set. -Each prefix in an UPDATE is assigned one of three RPKI validation states: +**ROV (Route Origin Validation, +[RFC 6811](https://datatracker.ietf.org/doc/html/rfc6811))** checks whether the +origin AS is authorized to announce each prefix, using VRPs (Validated ROA +Payloads) received from an RPKI cache server or loaded from a file. Each prefix +is assigned one of three states: -- **VALID** - a ROA exists and matches the origin AS and prefix length -- **INVALID** - a ROA exists but the origin AS or prefix length does not match -- **NOT_FOUND** - no ROA covers this prefix +- **VALID** -- a VRP covers this prefix with a matching origin AS and maxLength +- **INVALID** -- a VRP exists for the prefix but the origin AS or length does not match +- **NOT_FOUND** -- no VRP covers this prefix -The stage obtains ROA data either from an RTR (RPKI-to-Router) server or -from a local ROA file. By default, it connects to Cloudflare's public RTR -server at `rtr.rpki.cloudflare.com:8282`. +**ASPA (Autonomous System Provider Authorization, +[draft-ietf-sidrops-aspa-verification](https://datatracker.ietf.org/doc/draft-ietf-sidrops-aspa-verification/))** detects route leaks by verifying that the AS_PATH is valley-free, +using ASPA records that attest provider-customer relationships between ASes. +Each path is assigned one of three states: -The `--invalid` option controls how INVALID prefixes are handled: +- **VALID** -- the path is valley-free with full cryptographic attestation +- **UNKNOWN** -- the path appears valley-free but some ASes lack ASPA records + (insufficient attestation, not evidence of a leak) +- **INVALID** -- the path provably violates valley-free routing (route leak) + +ASPA validation is **disabled by default** and requires `--aspa` to enable. It +also requires: + +1. ASPA records from an RTR v2 server + ([draft-ietf-sidrops-8210bis](https://datatracker.ietf.org/doc/draft-ietf-sidrops-8210bis/)) + or a JSON file with ASPA data +2. Knowledge of the peer's BGP role, either auto-detected via the + [RFC 9234](https://datatracker.ietf.org/doc/html/rfc9234) BGP Role + capability, or set explicitly with `--aspa-role` + +If `--aspa-role auto` (the default) and the peer does not send the BGP Role +capability in their OPEN message, ASPA validation is skipped for the session +and a warning is logged. Set `--aspa-role` to force ASPA validation when the +peer lacks this capability. + +ASPA also verifies that the first AS in the path matches the neighbor's ASN +(per draft-ietf-sidrops-aspa-verification, Section 5). This check is skipped +for Route Server peers (`--aspa-role rs`), as RSes do not prepend their ASN +([RFC 7947](https://datatracker.ietf.org/doc/html/rfc7947)). + +The stage obtains VRP and ASPA data either from an RTR server (supporting +RTR v0/v1/v2 with automatic version negotiation and fallback) or from a local +file. By default, it connects to Cloudflare's public RTR server at +`rtr.rpki.cloudflare.com:8282`. + +### Actions for INVALID routes + +The `--invalid` (ROV) and `--aspa-invalid` (ASPA) options control handling: + +**ROV** operates per-prefix and supports all five actions: + +| Action | Behavior | +|--------|----------| +| `withdraw` | Move invalid prefixes to withdrawn | +| `filter` | Remove invalid prefixes silently | +| `drop` | Drop the entire UPDATE message | +| `split` | Split invalid prefixes to a separate UPDATE with withdrawals | +| `keep` | Keep unchanged (tag only) | + +**ASPA** validates the entire AS_PATH (one per UPDATE), so per-prefix actions +(`filter`, `split`) do not apply. Supported actions: | Action | Behavior | |--------|----------| -| `withdraw` | Move invalid prefixes to the withdrawn list ([RFC 7606](https://datatracker.ietf.org/doc/html/rfc7606)) | -| `filter` | Remove invalid prefixes from the reachable list | -| `drop` | Drop the entire UPDATE if any prefix is invalid | -| `split` | Split invalid prefixes into a separate UPDATE that withdraws them | -| `keep` | Keep invalid prefixes unchanged (tag only) | +| `withdraw` | Move all reachable prefixes to withdrawn | +| `drop` | Drop the entire UPDATE message | +| `keep` | Keep unchanged (tag only) | -When `--tag` is enabled (the default), the stage adds `rpki/status` to -message tags, which can be used in downstream [filters](../filters.md) -(e.g., `tag[rpki/status] == INVALID`). +### Tags -With `--strict`, prefixes with NOT_FOUND status are treated the same as -INVALID. This is an aggressive policy that only allows prefixes with -explicit RPKI authorization. +When `--tag` is enabled (default), each prefix gets a per-prefix tag +(`rpki/`) and the message gets an overall `rpki/status` tag. +When `--aspa-tag` is enabled (default) and ASPA is active (`--aspa`), the +message gets `aspa/status`. +These can be used in downstream [filters](../filters.md). -The stage waits for the ROA cache to be populated before processing messages -(unless `--asap` is set), ensuring no messages are validated against an -incomplete cache. +With `--strict`, NOT_FOUND prefixes are treated the same as INVALID. ## Options @@ -52,35 +98,99 @@ incomplete cache. | Option | Type | Default | Description | |--------|------|---------|-------------| -| `--rtr` | string | `rtr.rpki.cloudflare.com:8282` | RTR server address (`host:port`) | -| `--rtr-refresh` | duration | `1h` | RTR cache refresh interval | -| `--rtr-retry` | duration | `10m` | RTR retry interval on errors | -| `--timeout` | duration | `15s` | Connect timeout; 0 disables | -| `--retry` | bool | `true` | Retry connection on errors | -| `--retry-max` | int | `0` | Max retry attempts; 0 means unlimited | -| `--tls` | bool | `false` | Connect to RTR server over TLS | +| `--rtr` | string | `rtr.rpki.cloudflare.com:8282` | RTR cache server address (`host:port`) | +| `--rtr-refresh` | duration | `1h` | Periodic Serial Query interval | +| `--rtr-retry` | duration | `10m` | Reconnection delay after failure | +| `--timeout` | duration | `15s` | TCP connect timeout (0 to disable) | +| `--retry` | bool | `true` | Retry on connection failure | +| `--retry-max` | int | `0` | Max retries (0 = unlimited) | +| `--tls` | bool | `false` | Connect over TLS | | `--insecure` | bool | `false` | Skip TLS certificate validation | -| `--no-ipv6` | bool | `false` | Avoid IPv6 when connecting to RTR server | +| `--no-ipv6` | bool | `false` | Avoid IPv6 for RTR server connection | -### ROA File +### VRP/ASPA File | Option | Type | Default | Description | |--------|------|---------|-------------| -| `--file` | string | | Use a local ROA file instead of RTR (JSON or CSV, auto-reloaded) | +| `--file` | string | | Local VRP/ASPA file (JSON or CSV, auto-reloaded every 10s) | + +JSON format (Routinator-compatible): + +```json +{ + "roas": [ + {"prefix": "192.0.2.0/24", "maxLength": 24, "asn": "AS65001"}, + {"prefix": "2001:db8::/32", "maxLength": 48, "asn": 65002} + ], + "aspas": [ + {"customer_asid": 65001, "provider_asids": [65002, 65003]} + ] +} +``` -### Validation Policy +CSV format (one VRP per line, `#` comments, optional header): + +```csv +prefix,maxLength,asn +192.0.2.0/24,24,AS65001 +2001:db8::/32,48,65002 +``` + +### ROV Policy | Option | Type | Default | Description | |--------|------|---------|-------------| -| `--invalid` | string | `withdraw` | Action for INVALID prefixes: `withdraw`, `filter`, `drop`, `split`, `keep` | -| `--strict` | bool | `false` | Treat NOT_FOUND same as INVALID | -| `--tag` | bool | `true` | Add `rpki/status` to message tags | -| `--event` | string | | Emit this event on RPKI INVALID messages | -| `--asap` | bool | `false` | Start validating before ROA cache is ready | +| `--invalid` | string | `withdraw` | Action for ROV INVALID: `withdraw`, `filter`, `drop`, `split`, `keep` | +| `--strict` | bool | `false` | Treat NOT_FOUND as INVALID | +| `--tag` | bool | `true` | Add `rpki/status` and `rpki/` tags | +| `--event` | string | | Emit named event on ROV INVALID | +| `--no-wait` | bool | `false` | Start before VRP/ASPA cache is ready | + +### ASPA Policy + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `--aspa` | bool | `false` | Enable ASPA path validation | +| `--aspa-invalid` | string | `withdraw` | Action for ASPA INVALID: `withdraw`, `drop`, `keep` | +| `--aspa-tag` | bool | `true` | Add `aspa/status` tag | +| `--aspa-event` | string | | Emit named event on ASPA INVALID | +| `--aspa-role` | string | `auto` | Peer's BGP role: `auto`, `provider`, `customer`, `peer`, `rs`, `rs-client` | + +The `--aspa-role` flag specifies the peer's BGP role (from the peer's +perspective, per [RFC 9234](https://datatracker.ietf.org/doc/html/rfc9234)): + +| `--aspa-role` value | Meaning | ASPA direction | +|---------------|---------|----------------| +| `provider` | Peer is our provider | Downstream (route came from above) | +| `rs` | Peer is a Route Server | Downstream (treated like provider) | +| `customer` | Peer is our customer | Upstream (route came from below) | +| `rs-client` | Peer is an RS client | Upstream | +| `peer` | Peer is a lateral peer | Upstream | +| `auto` | Auto-detect from BGP Role capability | Depends on detected role | + +### Prometheus Metrics + +| Metric | Type | Description | +|--------|------|-------------| +| `bgpipe_rpki_messages_total` | counter | UPDATE messages processed | +| `bgpipe_rpki_rov_valid_total` | counter | Prefixes with ROV state VALID | +| `bgpipe_rpki_rov_invalid_total` | counter | Prefixes with ROV state INVALID | +| `bgpipe_rpki_rov_not_found_total` | counter | Prefixes with ROV state NOT_FOUND | +| `bgpipe_rpki_vrps_ipv4` | gauge | IPv4 VRPs loaded | +| `bgpipe_rpki_vrps_ipv6` | gauge | IPv6 VRPs loaded | + +When `--aspa` is enabled, the following are also registered: + +| Metric | Type | Description | +|--------|------|-------------| +| `bgpipe_rpki_aspa_valid_total` | counter | Paths with ASPA state VALID | +| `bgpipe_rpki_aspa_unknown_total` | counter | Paths with ASPA state UNKNOWN | +| `bgpipe_rpki_aspa_invalid_total` | counter | Paths with ASPA state INVALID | +| `bgpipe_rpki_aspa_entries` | gauge | ASPA records loaded | ## Examples -Basic RPKI filtering between two routers (default: withdraw invalid): +Basic ROV (default: withdraw INVALID prefixes): ```bash bgpipe \ @@ -89,7 +199,16 @@ bgpipe \ -- connect 192.0.2.1 ``` -Keep invalid prefixes but tag them for downstream processing: +ROV + ASPA with explicit peer role: + +```bash +bgpipe \ + -- listen :179 \ + -- rpki --aspa --aspa-role customer \ + -- connect 192.0.2.1 +``` + +Tag only, no enforcement -- useful for monitoring: ```bash bgpipe -o \ @@ -98,7 +217,7 @@ bgpipe -o \ -- grep 'tag[rpki/status] == INVALID' ``` -Strict mode: only allow RPKI-VALID prefixes: +Strict ROV: drop any prefix without a valid VRP: ```bash bgpipe --events rpki/dropped \ @@ -107,16 +226,25 @@ bgpipe --events rpki/dropped \ -- connect 192.0.2.1 ``` -Use a local ROA file instead of RTR: +Use a local VRP/ASPA file instead of RTR: ```bash bgpipe \ -- listen :179 \ - -- rpki --file /var/lib/rpki/roas.json --invalid filter \ + -- rpki --file /var/lib/rpki/export.json --invalid filter \ -- connect 192.0.2.1 ``` -Tag with RPKI status and add a community to invalid routes: +ROV + ASPA monitoring (tag everything, enforce nothing): + +```bash +bgpipe -o \ + -- ris-live \ + -- rpki --invalid keep --aspa --aspa-invalid keep \ + -- grep 'tag[aspa/status] == INVALID' +``` + +Add a community to ROV-invalid routes instead of dropping: ```bash bgpipe \ @@ -126,7 +254,7 @@ bgpipe \ -- connect 10.0.0.1 ``` -Connect to an RTR server over TLS: +Connect to a private RTR cache over TLS: ```bash bgpipe \ @@ -140,5 +268,13 @@ bgpipe \ [limit](limit.md), [grep](grep.md), [update](update.md), -[RFC 6811 - RPKI-Based Origin Validation](https://datatracker.ietf.org/doc/html/rfc6811), [Stages overview](index.md) + +### References + +- [RFC 6811](https://datatracker.ietf.org/doc/html/rfc6811) -- RPKI-Based Route Origin Validation +- [RFC 8210](https://datatracker.ietf.org/doc/html/rfc8210) -- RPKI-to-Router Protocol (RTR v1) +- [RFC 9234](https://datatracker.ietf.org/doc/html/rfc9234) -- Route Leak Prevention and Detection Using Roles in UPDATE and OPEN Messages +- [draft-ietf-sidrops-aspa-verification](https://datatracker.ietf.org/doc/draft-ietf-sidrops-aspa-verification/) -- Verification of AS_PATH Using ASPA Objects +- [draft-ietf-sidrops-8210bis](https://datatracker.ietf.org/doc/draft-ietf-sidrops-8210bis/) -- RTR v2 (adds ASPA support) +- [RFC 7947](https://datatracker.ietf.org/doc/html/rfc7947) -- Internet Exchange BGP Route Server diff --git a/go.mod b/go.mod index b8e138f..ba52648 100644 --- a/go.mod +++ b/go.mod @@ -1,31 +1,32 @@ module github.com/bgpfix/bgpipe -go 1.24.0 +go 1.25.0 toolchain go1.26.0 require ( - github.com/VictoriaMetrics/metrics v1.41.2 - github.com/bgp/stayrtr v0.6.3 + github.com/VictoriaMetrics/metrics v1.43.1 github.com/bgpfix/bgpfix v0.18.0 - github.com/buger/jsonparser v1.1.1 + github.com/buger/jsonparser v1.1.2 github.com/dsnet/compress v0.0.2-0.20230904184137-39efe44ab707 github.com/go-chi/chi/v5 v5.2.5 github.com/gorilla/websocket v1.5.3 - github.com/klauspost/compress v1.18.4 + github.com/klauspost/compress v1.18.5 github.com/knadh/koanf/providers/posflag v1.0.1 - github.com/knadh/koanf/v2 v2.3.3 + github.com/knadh/koanf/v2 v2.3.4 github.com/puzpuzpuz/xsync/v4 v4.4.0 - github.com/rs/zerolog v1.34.0 + github.com/rs/zerolog v1.35.0 github.com/spf13/pflag v1.0.10 + github.com/stretchr/testify v1.11.1 github.com/twmb/franz-go v1.20.7 github.com/twmb/franz-go/pkg/kadm v1.17.2 github.com/valyala/bytebufferpool v1.0.0 - golang.org/x/sys v0.41.0 - golang.org/x/time v0.14.0 + golang.org/x/sys v0.42.0 + golang.org/x/time v0.15.0 ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-viper/mapstructure/v2 v2.5.0 // indirect github.com/itlightning/dateparse v0.2.1 // indirect github.com/knadh/koanf/maps v0.1.2 // indirect @@ -34,12 +35,13 @@ require ( github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pierrec/lz4/v4 v4.1.26 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/twmb/franz-go/pkg/kmsg v1.12.0 // indirect github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/histogram v1.2.0 // indirect - golang.org/x/crypto v0.48.0 // indirect - golang.org/x/sync v0.19.0 // indirect + golang.org/x/crypto v0.49.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) // for dev: use the latest code in ../bgpfix -//replace github.com/bgpfix/bgpfix => ../bgpfix +replace github.com/bgpfix/bgpfix => ../bgpfix diff --git a/go.sum b/go.sum index b932337..d957a80 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,11 @@ github.com/VictoriaMetrics/metrics v1.41.2 h1:pLQ4Mw9TqXFq3ZsZVJkz88JHpjL9LY5NHTY3v2gBNAw= github.com/VictoriaMetrics/metrics v1.41.2/go.mod h1:xDM82ULLYCYdFRgQ2JBxi8Uf1+8En1So9YUwlGTOqTc= -github.com/bgp/stayrtr v0.6.3 h1:d3MW7FV7yV9bP46aEZhggCcxK8tgooB5XEwa6dYVXM4= -github.com/bgp/stayrtr v0.6.3/go.mod h1:gLXsFU0j2wTKYxDQqXHl9R34FpD1IH59KHwhgFdI3r8= -github.com/bgpfix/bgpfix v0.18.0 h1:UJVxpeJXyPfo+2p4i/LBxkRtTQKonU9tnpYWcUhJoY0= -github.com/bgpfix/bgpfix v0.18.0/go.mod h1:BaNnk8iWEyswYJ1Kd3aPDHk8zVXynNo7KnK4bUhQF0U= +github.com/VictoriaMetrics/metrics v1.43.1 h1:j3Ba4l2K1q3pkvzPqt6aSiQ2DBlAEj3VPVeBtpR3t/Y= +github.com/VictoriaMetrics/metrics v1.43.1/go.mod h1:xDM82ULLYCYdFRgQ2JBxi8Uf1+8En1So9YUwlGTOqTc= github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= +github.com/buger/jsonparser v1.1.2 h1:frqHqw7otoVbk5M8LlE/L7HTnIq2v9RX6EJ48i9AxJk= +github.com/buger/jsonparser v1.1.2/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -18,8 +18,6 @@ github.com/go-viper/mapstructure/v2 v2.5.0 h1:vM5IJoUAy3d7zRSVtIwQgBj7BiWtMPfmPE github.com/go-viper/mapstructure/v2 v2.5.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= -github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/itlightning/dateparse v0.2.1 h1:AB0NJTyI0HYcerEUMovKZOiQVBg1mBPxgAnWQwzLP6g= @@ -27,6 +25,8 @@ github.com/itlightning/dateparse v0.2.1/go.mod h1:xHlmL8lT0L9JIBlaKotRwsoDYpKJsk github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= +github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/knadh/koanf/maps v0.1.2 h1:RBfmAW5CnZT+PJ1CVc1QSJKf4Xu9kxfQgYVQSu8hpbo= github.com/knadh/koanf/maps v0.1.2/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= @@ -34,6 +34,8 @@ github.com/knadh/koanf/providers/posflag v1.0.1 h1:EnMxHSrPkYCFnKgBUl5KBgrjed8gV github.com/knadh/koanf/providers/posflag v1.0.1/go.mod h1:3Wn3+YG3f4ljzRyCUgIwH7G0sZ1pMjCOsNBovrbKmAk= github.com/knadh/koanf/v2 v2.3.3 h1:jLJC8XCRfLC7n4F+ZKKdBsbq1bfXTpuFhf4L7t94D94= github.com/knadh/koanf/v2 v2.3.3/go.mod h1:gRb40VRAbd4iJMYYD5IxZ6hfuopFcXBpc9bbQpZwo28= +github.com/knadh/koanf/v2 v2.3.4 h1:fnynNSDlujWE+v83hAp8wKr/cdoxHLO0629SN+U8Urc= +github.com/knadh/koanf/v2 v2.3.4/go.mod h1:gRb40VRAbd4iJMYYD5IxZ6hfuopFcXBpc9bbQpZwo28= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= @@ -55,6 +57,8 @@ github.com/puzpuzpuz/xsync/v4 v4.4.0/go.mod h1:VJDmTCJMBt8igNxnkQd86r+8KUeN1quSf github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= +github.com/rs/zerolog v1.35.0 h1:VD0ykx7HMiMJytqINBsKcbLS+BJ4WYjz+05us+LRTdI= +github.com/rs/zerolog v1.35.0/go.mod h1:EjML9kdfa/RMA7h/6z6pYmq1ykOuA8/mjWaEvGI+jcw= github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= @@ -74,17 +78,21 @@ github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OL github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4= +golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/term v0.40.0 h1:36e4zGLqU4yhjlmxEaagx2KuYbJq3EwY8K943ZsHcvg= -golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= +golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= +golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/util/net.go b/pkg/util/net.go index 3595c78..a6855c4 100644 --- a/pkg/util/net.go +++ b/pkg/util/net.go @@ -15,6 +15,9 @@ import ( "github.com/bgpfix/bgpipe/core" ) +// ConnPublish publishes connection information to stage s's pipe KV store, for the benefit of other stages. +// It sets L_LOCAL_ADDR, L_LOCAL_PORT, L_REMOTE_ADDR, L_REMOTE_PORT for the first stage, +// and R_LOCAL_ADDR, R_LOCAL_PORT, R_REMOTE_ADDR, R_REMOTE_PORT for the last stage. func ConnPublish(s *core.StageBase, conn net.Conn) { var todo map[string]string if s.IsFirst { diff --git a/stages/limit.go b/stages/limit.go index de51426..a9dd802 100644 --- a/stages/limit.go +++ b/stages/limit.go @@ -353,7 +353,7 @@ func (s *Limit) checkUnreach(u *msg.Update) (before, after int) { if s.ipv4 { before += len(u.Unreach) u.Unreach = slices.DeleteFunc(u.Unreach, dropUnreach) - after += len(u.Reach) + after += len(u.Unreach) } // prefixes in the MP part? diff --git a/stages/rpki/aspa.go b/stages/rpki/aspa.go new file mode 100644 index 0000000..8ae6a02 --- /dev/null +++ b/stages/rpki/aspa.go @@ -0,0 +1,281 @@ +package rpki + +import ( + "slices" + "strings" + + "github.com/bgpfix/bgpfix/attrs" + "github.com/bgpfix/bgpfix/caps" + "github.com/bgpfix/bgpfix/dir" + "github.com/bgpfix/bgpfix/msg" + "github.com/bgpfix/bgpfix/pipe" +) + +// aspAuthorized return values +const ( + asp_no_att = 0 // CAS has no ASPA record + asp_prov = 1 // PAS is in CAS's provider list + asp_not = 2 // CAS has ASPA but PAS is not listed +) + +// aspAuthorized checks ASPA authorization for a CAS→PAS hop. +// NB: provider lists must be sorted (see nextASPA). +func aspAuthorized(aspa ASPA, cas, pas uint32) int { + provs, ok := aspa[cas] + if !ok { + return asp_no_att + } + if _, found := slices.BinarySearch(provs, pas); found { + return asp_prov + } + return asp_not +} + +// aspVerify verifies the flat AS_PATH against ASPA. +// +// path[0] is the most-recently-traversed AS (direct peer), +// path[N-1] is the origin AS. Returns aspa_valid, aspa_unknown, or aspa_invalid. +// +// downstream=true when received from a provider or RS (downstream direction). +// downstream=false when received from a customer, peer, or RS-client (upstream). +// +// NB: does not check path[0] == neighbor AS (draft §5.4/5.5 step 2). +// The caller must do that check, skipping it for RS peers (RFC 7947). +func aspVerify(aspa ASPA, path []uint32, downstream bool) int { + n := len(path) + if n <= 1 { + return aspa_valid + } + + if !downstream { + // upstream: every hop should go up (each AS sent to its provider) + result := aspa_valid + for i := 0; i < n-1; i++ { + switch aspAuthorized(aspa, path[i+1], path[i]) { + case asp_not: + return aspa_invalid + case asp_no_att: + result = aspa_unknown + } + } + return result + } + + // downstream: find up-ramp from origin + down-ramp from peer. + // Valid if up_ramp + down_ramp covers all N-1 pairs (valley-free). + // + // max counts Provider and NoAttestation until first NotProvider; + // min counts only leading Provider hops (stops at first NoAttestation). + maxUp, minUp := 0, 0 + exact := true + for i := n - 2; i >= 0; i-- { + auth := aspAuthorized(aspa, path[i+1], path[i]) + if auth == asp_not { + break + } + maxUp++ + if auth == asp_prov && exact { + minUp++ + } else { + exact = false + } + } + + maxDown, minDown := 0, 0 + exact = true + for i := 0; i < n-1; i++ { + auth := aspAuthorized(aspa, path[i], path[i+1]) + if auth == asp_not { + break + } + maxDown++ + if auth == asp_prov && exact { + minDown++ + } else { + exact = false + } + } + + if maxUp+maxDown < n-1 { + return aspa_invalid + } + if minUp+minDown < n-1 { + return aspa_unknown + } + return aspa_valid +} + +// aspPeerASN returns the peer's ASN from its OPEN message, or 0 if unavailable. +func aspPeerASN(p *pipe.Pipe, d dir.Dir) uint32 { + om := p.LineFor(d).Open.Load() + if om == nil { + return 0 + } + return uint32(om.GetASN()) +} + +// aspPeerRole reads the BGP Role capability from the peer's OPEN message. +func aspPeerRole(p *pipe.Pipe, d dir.Dir) (byte, bool) { + om := p.LineFor(d).Open.Load() + if om == nil { + return 0, false + } + c, ok := om.Caps.Get(caps.CAP_ROLE).(*caps.Role) + if !ok || c == nil { + return 0, false + } + return c.Role, true +} + +// aspIsDownstream maps the peer's BGP Role to the downstream flag. +// Per RFC 9234: PROVIDER → we are their customer → downstream. +// Per ASPA draft §6.3: RS is treated like a provider. +func aspIsDownstream(role byte) bool { + return role == caps.ROLE_PROVIDER || role == caps.ROLE_RS +} + +// parseRoleName converts a --aspa-role flag string to a caps.ROLE_* constant. +func parseRoleName(name string) (byte, bool) { + switch strings.ToLower(name) { + case "provider": + return caps.ROLE_PROVIDER, true + case "rs": + return caps.ROLE_RS, true + case "rs-client": + return caps.ROLE_RS_CLIENT, true + case "customer": + return caps.ROLE_CUSTOMER, true + case "peer": + return caps.ROLE_PEER, true + default: + return 0, false + } +} + +// validateAspa performs ASPA path validation for the UPDATE message. +// Returns false to drop, true to keep. +func (s *Rpki) validateAspa(m *msg.Msg, u *msg.Update, tags map[string]string) bool { + aspa := s.aspa.Load() + if aspa == nil || len(*aspa) == 0 { + return true // no ASPA data + } + if !u.HasReach() { + return true // withdrawal-only, no AS_PATH to validate + } + + // NB: role resolved once per direction on first UPDATE. BGP guarantees OPEN + // is exchanged before any UPDATE. If --aspa-role auto and peer didn't + // send BGP Role capability, ASPA is permanently skipped for this direction. + di := m.Dir & 1 // direction index: 0=R, 1=L + s.peer_role_mu[di].Do(func() { + if s.aspa_role != "auto" { + // NB: validated in Attach() + role, _ := parseRoleName(s.aspa_role) + s.peer_role[di] = int(role) + s.peer_role_ok[di] = true + s.peer_down[di] = aspIsDownstream(role) + s.Info().Str("role", s.aspa_role).Str("dir", m.Dir.String()).Msg("ASPA: peer role set via --aspa-role") + } else { + role, ok := aspPeerRole(s.P, m.Dir) + if !ok { + s.Warn().Str("dir", m.Dir.String()).Msg("ASPA: peer did not send BGP Role capability, skipping (use --aspa-role to override)") + s.peer_role[di] = -1 + return + } + s.peer_role[di] = int(role) + s.peer_role_ok[di] = true + s.peer_down[di] = aspIsDownstream(role) + s.Info().Int("role", int(role)).Bool("downstream", s.peer_down[di]).Str("dir", m.Dir.String()).Msg("ASPA: peer role detected") + } + }) + if !s.peer_role_ok[di] { + return true + } + + // empty AS_PATH = iBGP or locally-originated, nothing to validate + if u.AsPath().Len() == 0 { + return true + } + + flat := u.AsPath().Unique() + + // verify path + var result int + if flat == nil { + result = aspa_invalid // AS_SET present → invalid per ASPA spec §3 + } else if len(flat) > 1 { + // NB: per draft §5.4/5.5 step 2, path[0] must equal neighbor AS. + // RS peers don't prepend their ASN (RFC 7947). + if s.peer_role[di] != int(caps.ROLE_RS) { + peerASN := aspPeerASN(s.P, m.Dir) + if peerASN == 0 { + s.Warn().Msg("ASPA: peer ASN unknown, first-hop check skipped") + } + if peerASN != 0 && flat[0] != peerASN { + result = aspa_invalid + } else { + result = aspVerify(*aspa, flat, s.peer_down[di]) + } + } else { + result = aspVerify(*aspa, flat, s.peer_down[di]) + } + } else { + result = aspa_valid // single-hop + } + + // metrics + switch result { + case aspa_valid: + s.cnt_aspa_valid.Inc() + case aspa_unknown: + s.cnt_aspa_unk.Inc() + case aspa_invalid: + s.cnt_aspa_inv.Inc() + } + + // tag + if s.aspa_tag { + switch result { + case aspa_valid: + tags["aspa/status"] = "VALID" + case aspa_unknown: + tags["aspa/status"] = "UNKNOWN" + case aspa_invalid: + tags["aspa/status"] = "INVALID" + } + m.Edit() + } + + if result != aspa_invalid { + return true + } + + // event + if s.aspa_ev != "" { + s.Event(s.aspa_ev, m) + } + + // action: ASPA condemns the entire path, not individual prefixes + switch s.aspa_act { + case act_drop: + return false + case act_withdraw: + // move all reachable prefixes to withdrawn + reach := slices.Clone(u.Reach) + u.Reach = nil + if mpp := u.ReachMP().Prefixes(); mpp != nil { + reach = append(reach, mpp.Prefixes...) + mpp.Prefixes = nil + } + if len(reach) > 0 { + u.AddUnreach(reach...) + } + // NB: pure withdrawal must not carry path attributes (RFC 4271 §4.3) + if !u.HasReach() { + u.Attrs.Filter(attrs.ATTR_MP_UNREACH) + } + m.Edit() + } + + return true +} diff --git a/stages/rpki/aspa_test.go b/stages/rpki/aspa_test.go new file mode 100644 index 0000000..f00ebf3 --- /dev/null +++ b/stages/rpki/aspa_test.go @@ -0,0 +1,220 @@ +package rpki + +import ( + "testing" + + "github.com/bgpfix/bgpfix/caps" + "github.com/stretchr/testify/require" +) + +// --- aspAuthorized tests --- + +func TestAspAuthorized_Provider(t *testing.T) { + aspa := ASPA{ + 65001: {65100, 65200}, + } + require.Equal(t, asp_prov, aspAuthorized(aspa, 65001, 65100)) + require.Equal(t, asp_prov, aspAuthorized(aspa, 65001, 65200)) +} + +func TestAspAuthorized_NotProvider(t *testing.T) { + aspa := ASPA{ + 65001: {65100, 65200}, + } + require.Equal(t, asp_not, aspAuthorized(aspa, 65001, 65999)) +} + +func TestAspAuthorized_NoAttestation(t *testing.T) { + aspa := ASPA{ + 65001: {65100}, + } + // CAS 65002 has no ASPA record + require.Equal(t, asp_no_att, aspAuthorized(aspa, 65002, 65100)) +} + +func TestAspAuthorized_EmptyProviderList(t *testing.T) { + aspa := ASPA{ + 65001: {}, // has record but no providers + } + require.Equal(t, asp_not, aspAuthorized(aspa, 65001, 65100)) +} + +// --- aspVerify upstream tests --- + +func TestAspVerify_Upstream_Valid(t *testing.T) { + // path: 65001 → 65002 → 65003 (origin) + // 65003 says 65002 is my provider, 65002 says 65001 is my provider + aspa := ASPA{ + 65003: {65002}, + 65002: {65001}, + } + path := []uint32{65001, 65002, 65003} + require.Equal(t, aspa_valid, aspVerify(aspa, path, false)) +} + +func TestAspVerify_Upstream_Invalid(t *testing.T) { + // 65003 says 65002 is NOT its provider (65099 is) + aspa := ASPA{ + 65003: {65099}, + 65002: {65001}, + } + path := []uint32{65001, 65002, 65003} + require.Equal(t, aspa_invalid, aspVerify(aspa, path, false)) +} + +func TestAspVerify_Upstream_Unknown(t *testing.T) { + // 65002 has no ASPA record → unknown + aspa := ASPA{ + 65003: {65002}, + } + path := []uint32{65001, 65002, 65003} + require.Equal(t, aspa_unknown, aspVerify(aspa, path, false)) +} + +func TestAspVerify_Upstream_SingleHop(t *testing.T) { + aspa := ASPA{} + require.Equal(t, aspa_valid, aspVerify(aspa, []uint32{65001}, false)) +} + +func TestAspVerify_Upstream_TwoHop_Valid(t *testing.T) { + // path: 65001 → 65002. 65002 says 65001 is provider. + aspa := ASPA{ + 65002: {65001}, + } + require.Equal(t, aspa_valid, aspVerify(aspa, []uint32{65001, 65002}, false)) +} + +func TestAspVerify_Upstream_TwoHop_Invalid(t *testing.T) { + // path: 65001 → 65002. 65002 says 65099 is provider, not 65001. + aspa := ASPA{ + 65002: {65099}, + } + require.Equal(t, aspa_invalid, aspVerify(aspa, []uint32{65001, 65002}, false)) +} + +// --- aspVerify downstream tests --- + +func TestAspVerify_Downstream_ValleyFree(t *testing.T) { + // path: 65001 → 65002 → 65003 (origin) + // valley-free: origin goes up to 65002, then 65002 goes down to 65001 + // up-ramp: 65003→65002 (65003 says 65002 is provider) + // down-ramp: 65001→65002 (65001 says 65002 is provider) + aspa := ASPA{ + 65003: {65002}, + 65001: {65002}, + } + path := []uint32{65001, 65002, 65003} + require.Equal(t, aspa_valid, aspVerify(aspa, path, true)) +} + +func TestAspVerify_Downstream_NotValleyFree(t *testing.T) { + // path: 65001 → 65002 → 65003 (origin) + // all ASes have ASPA records but the path is not valley-free: + // up-ramp: aspAuthorized(65003, 65002) → 65003 says 65099, not 65002 → NotProvider → break (maxUp=0) + // down-ramp: aspAuthorized(65001, 65002) → 65001 says 65099, not 65002 → NotProvider → break (maxDown=0) + // maxUp + maxDown = 0 < 2 → invalid + aspa := ASPA{ + 65003: {65099}, + 65002: {65099}, + 65001: {65099}, + } + path := []uint32{65001, 65002, 65003} + require.Equal(t, aspa_invalid, aspVerify(aspa, path, true)) +} + +func TestAspVerify_Downstream_Unknown(t *testing.T) { + // path: 65001 → 65002 → 65003 (origin) + // 65003 says 65002 is provider (up-ramp=1) + // 65001 has no ASPA (down max=1, min=0) + // maxUp + maxDown = 1 + 1 = 2 >= 2 ✓ + // minUp + minDown = 1 + 0 = 1 < 2 → unknown + aspa := ASPA{ + 65003: {65002}, + } + path := []uint32{65001, 65002, 65003} + require.Equal(t, aspa_unknown, aspVerify(aspa, path, true)) +} + +func TestAspVerify_Downstream_LongValleyFree(t *testing.T) { + // 4-hop valley-free path: 65001 → 65002 → 65003 → 65004 (origin) + // origin goes up: 65004→65003 (provider), 65003→65002 (provider) + // peer goes down: 65001→65002 (provider) + aspa := ASPA{ + 65004: {65003}, + 65003: {65002}, + 65001: {65002}, + } + path := []uint32{65001, 65002, 65003, 65004} + require.Equal(t, aspa_valid, aspVerify(aspa, path, true)) +} + +func TestAspVerify_Downstream_PeerPeering(t *testing.T) { + // 3-hop with peering at top: 65001 → 65002 → 65003 (origin) + // all ASes have ASPA records so all lookups are definitive: + // up-ramp: aspAuthorized(65003, 65002) → 65003 says 65099, not 65002 → NotProvider → maxUp=0 + // down-ramp: aspAuthorized(65001, 65002) → 65001 says 65002 → Provider → maxDown=1 + // maxUp+maxDown=0+1=1 < 2 → invalid + aspa := ASPA{ + 65003: {65099}, // 65003's provider is 65099, not 65002 + 65002: {65099}, // 65002 has record (needed for definitive NotProvider results) + 65001: {65002}, + } + path := []uint32{65001, 65002, 65003} + require.Equal(t, aspa_invalid, aspVerify(aspa, path, true)) +} + +func TestAspVerify_EmptyASPA(t *testing.T) { + // no ASPA data → all hops are NoAttestation → unknown + aspa := ASPA{} + path := []uint32{65001, 65002, 65003} + require.Equal(t, aspa_unknown, aspVerify(aspa, path, false)) + require.Equal(t, aspa_unknown, aspVerify(aspa, path, true)) +} + +// --- parseRoleName tests --- + +func TestParseRoleName(t *testing.T) { + tests := []struct { + name string + ok bool + }{ + {"provider", true}, + {"Provider", true}, + {"PROVIDER", true}, + {"rs", true}, + {"RS", true}, + {"rs-client", true}, + {"customer", true}, + {"peer", true}, + {"unknown", false}, + {"auto", false}, + {"", false}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + _, ok := parseRoleName(tc.name) + require.Equal(t, tc.ok, ok) + }) + } +} + +// --- aspIsDownstream tests --- + +func TestAspIsDownstream(t *testing.T) { + tests := []struct { + name string + role byte + downstream bool + }{ + {"provider is downstream", caps.ROLE_PROVIDER, true}, + {"rs is downstream", caps.ROLE_RS, true}, + {"rs-client is not downstream", caps.ROLE_RS_CLIENT, false}, + {"customer is not downstream", caps.ROLE_CUSTOMER, false}, + {"peer is not downstream", caps.ROLE_PEER, false}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.downstream, aspIsDownstream(tc.role)) + }) + } +} diff --git a/stages/rpki/file.go b/stages/rpki/file.go index 8f8e6da..d6d66b3 100644 --- a/stages/rpki/file.go +++ b/stages/rpki/file.go @@ -11,22 +11,19 @@ import ( "time" ) -// fileRun does initial load and polls the file for changes +// fileRun does initial load and polls the file for changes. func (s *Rpki) fileRun() { - // first load - err := s.fileLoad() - if err != nil { - s.Fatal().Err(err).Msg("could not load the ROA file") + if err := s.fileLoad(); err != nil { + s.Fatal().Err(err).Msg("could not load RPKI data file") } - // keep polling ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: if err := s.fileLoad(); err != nil { - s.Err(err).Msg("failed to re-load the ROA file") + s.Err(err).Msg("failed to re-load RPKI data file") } case <-s.Ctx.Done(): return @@ -34,9 +31,8 @@ func (s *Rpki) fileRun() { } } -// fileLoad loads ROA data from file +// fileLoad loads VRP/ASPA data from file. func (s *Rpki) fileLoad() error { - // stat file, check mod time fi, err := os.Stat(s.file) if err != nil { return err @@ -45,7 +41,6 @@ func (s *Rpki) fileLoad() error { return nil } - // read file, check contents data, err := os.ReadFile(s.file) if err != nil { return err @@ -55,38 +50,37 @@ func (s *Rpki) fileLoad() error { return nil } - // restart from scratch s.nextFlush() if err := s.fileParse(data); err != nil { return err } - // apply s.nextApply() s.file_mod = fi.ModTime() s.file_hash = hash - return nil } -// fileParse parses ROA data from JSON or CSV +// fileParse parses VRP/ASPA data from JSON or CSV. func (s *Rpki) fileParse(data []byte) error { if len(data) > 0 && data[0] == '{' { return s.fileParseJSON(data) - } else { - return s.fileParseCSV(data) } + return s.fileParseCSV(data) } -// fileParseJSON parses Routinator-style JSON -// Format: {"roas": [{"prefix": "192.0.2.0/24", "maxLength": 24, "asn": "AS65001"}, ...]} +// fileParseJSON parses Routinator-style JSON with VRPs and ASPA records. func (s *Rpki) fileParseJSON(data []byte) error { var doc struct { ROAs []struct { Prefix string `json:"prefix"` MaxLength int `json:"maxLength"` - ASN any `json:"asn"` // can be string "AS65001" or int 65001 + ASN any `json:"asn"` } `json:"roas"` + ASPAs []struct { + CustomerASID uint32 `json:"customer_asid"` + ProviderASIDs []uint32 `json:"provider_asids"` + } `json:"aspas"` } if err := json.Unmarshal(data, &doc); err != nil { @@ -101,20 +95,21 @@ func (s *Rpki) fileParseJSON(data []byte) error { } prefix = prefix.Masked() - // Parse ASN (handle both "AS65001" and 65001) + if roa.MaxLength < 0 || roa.MaxLength > 128 { + s.Warn().Str("prefix", roa.Prefix).Int("maxLength", roa.MaxLength).Msg("maxLength out of range, skipping") + continue + } + var asn uint32 switch v := roa.ASN.(type) { case string: - v = strings.ToLower(v) - v = strings.TrimPrefix(v, "as") + v = strings.TrimPrefix(strings.ToLower(v), "as") n, err := strconv.ParseUint(v, 10, 32) if err != nil { s.Warn().Str("asn", fmt.Sprint(roa.ASN)).Msg("invalid ASN, skipping") continue } asn = uint32(n) - case int: - asn = uint32(v) case float64: asn = uint32(v) default: @@ -122,7 +117,15 @@ func (s *Rpki) fileParseJSON(data []byte) error { continue } - s.nextRoa(true, prefix, uint8(roa.MaxLength), asn) + s.nextVRP(true, prefix, uint8(roa.MaxLength), asn) + } + + for _, aspa := range doc.ASPAs { + if aspa.CustomerASID == 0 { + s.Warn().Msg("ASPA entry with zero customer ASN, skipping") + continue + } + s.nextASPA(true, aspa.CustomerASID, aspa.ProviderASIDs) } return nil @@ -137,7 +140,6 @@ func (s *Rpki) fileParseCSV(data []byte) error { continue } - // Skip header if i == 0 && strings.Contains(strings.ToLower(line), "prefix") { continue } @@ -160,16 +162,19 @@ func (s *Rpki) fileParseCSV(data []byte) error { s.Warn().Int("line", i+1).Err(err).Msg("invalid maxLength, skipping") continue } + if maxLen < 0 || maxLen > 128 { + s.Warn().Int("line", i+1).Int("maxLength", maxLen).Msg("maxLength out of range, skipping") + continue + } - asnStr := strings.ToLower(strings.TrimSpace(parts[2])) - asnStr = strings.TrimPrefix(asnStr, "as") + asnStr := strings.TrimPrefix(strings.ToLower(strings.TrimSpace(parts[2])), "as") asn, err := strconv.ParseUint(asnStr, 10, 32) if err != nil { s.Warn().Err(err).Int("line", i+1).Msg("invalid ASN, skipping") continue } - s.nextRoa(true, prefix, uint8(maxLen), uint32(asn)) + s.nextVRP(true, prefix, uint8(maxLen), uint32(asn)) } return nil diff --git a/stages/rpki/file_test.go b/stages/rpki/file_test.go index 2ce1235..58aa350 100644 --- a/stages/rpki/file_test.go +++ b/stages/rpki/file_test.go @@ -23,7 +23,7 @@ func TestFileParseJSON_ValidRoutinatorFormat(t *testing.T) { // Check IPv4 entries if len(s.next4) != 2 { - t.Errorf("expected 2 IPv4 ROAs, got %d", len(s.next4)) + t.Errorf("expected 2 IPv4 VRPs, got %d", len(s.next4)) } p1 := netip.MustParsePrefix("192.0.2.0/24") @@ -42,7 +42,7 @@ func TestFileParseJSON_ValidRoutinatorFormat(t *testing.T) { // Check IPv6 entries if len(s.next6) != 1 { - t.Errorf("expected 1 IPv6 ROA, got %d", len(s.next6)) + t.Errorf("expected 1 IPv6 VRP, got %d", len(s.next6)) } p3 := netip.MustParsePrefix("2001:db8::/32") @@ -133,10 +133,10 @@ func TestFileParseCSV_Valid(t *testing.T) { } if len(s.next4) != 2 { - t.Errorf("expected 2 IPv4 ROAs, got %d", len(s.next4)) + t.Errorf("expected 2 IPv4 VRPs, got %d", len(s.next4)) } if len(s.next6) != 1 { - t.Errorf("expected 1 IPv6 ROA, got %d", len(s.next6)) + t.Errorf("expected 1 IPv6 VRP, got %d", len(s.next6)) } // Verify specific entries @@ -158,7 +158,7 @@ func TestFileParseCSV_NoHeader(t *testing.T) { } if len(s.next4) != 2 { - t.Errorf("expected 2 IPv4 ROAs, got %d", len(s.next4)) + t.Errorf("expected 2 IPv4 VRPs, got %d", len(s.next4)) } } @@ -179,7 +179,7 @@ func TestFileParseCSV_Comments(t *testing.T) { } if len(s.next4) != 2 { - t.Errorf("expected 2 IPv4 ROAs (comments ignored), got %d", len(s.next4)) + t.Errorf("expected 2 IPv4 VRPs (comments ignored), got %d", len(s.next4)) } } @@ -218,7 +218,7 @@ invalid line // Should have 2 valid entries (first and last) // Others are skipped due to various validation errors if len(s.next4) != 2 { - t.Errorf("expected 2 valid IPv4 ROAs, got %d", len(s.next4)) + t.Errorf("expected 2 valid IPv4 VRPs, got %d", len(s.next4)) } } @@ -257,10 +257,10 @@ func TestFileParse_AutoDetect(t *testing.T) { } if len(s.next4) != tt.wantV4 { - t.Errorf("got %d IPv4 ROAs, want %d", len(s.next4), tt.wantV4) + t.Errorf("got %d IPv4 VRPs, want %d", len(s.next4), tt.wantV4) } if len(s.next6) != tt.wantV6 { - t.Errorf("got %d IPv6 ROAs, want %d", len(s.next6), tt.wantV6) + t.Errorf("got %d IPv6 VRPs, want %d", len(s.next6), tt.wantV6) } }) } diff --git a/stages/rpki/next.go b/stages/rpki/next.go index a521c97..5fc1f10 100644 --- a/stages/rpki/next.go +++ b/stages/rpki/next.go @@ -8,61 +8,80 @@ import ( ) func (s *Rpki) nextFlush() { - s.next4 = make(ROA) - s.next6 = make(ROA) + s.next4 = make(VRPs) + s.next6 = make(VRPs) + s.next_aspa = make(ASPA) } func (s *Rpki) nextApply() { - // publish next as current - roa4, roa6 := s.next4, s.next6 - s.roa4.Store(&roa4) - s.roa6.Store(&roa6) + v4, v6, aspa := s.next4, s.next6, s.next_aspa + s.vrp4.Store(&v4) + s.vrp6.Store(&v6) + s.aspa.Store(&aspa) - // signal the ROA is ready - s.Info().Int("v4", len(roa4)).Int("v6", len(roa6)).Msg("ROA cache updated") - util.Close(s.roa_done) + s.Info().Int("v4", len(v4)).Int("v6", len(v6)).Int("aspa", len(aspa)).Msg("RPKI cache updated") + util.Close(s.vrp_done) - // make next copies of current maps - s.next4 = make(ROA, len(roa4)) - for p, entries := range roa4 { + // copy-on-write: clone current into next for incremental updates + s.next4 = make(VRPs, len(v4)) + for p, entries := range v4 { if len(entries) > 0 { s.next4[p] = slices.Clone(entries) } } - s.next6 = make(ROA, len(roa6)) - for p, entries := range roa6 { + s.next6 = make(VRPs, len(v6)) + for p, entries := range v6 { if len(entries) > 0 { s.next6[p] = slices.Clone(entries) } } -} - -func (s *Rpki) nextRoa(add bool, prefix netip.Prefix, maxLen uint8, asn uint32) { - next := s.next4 - - // check maxLen - if ml := int(maxLen); ml < prefix.Bits() || ml > 128 { - s.Warn().Str("prefix", prefix.String()).Int("maxLength", ml).Msg("invalid MaxLength, skipping") - return + s.next_aspa = make(ASPA, len(aspa)) + for cas, provs := range aspa { + s.next_aspa[cas] = slices.Clone(provs) } +} - // is IPv6? +func (s *Rpki) nextVRP(add bool, prefix netip.Prefix, maxLen uint8, asn uint32) { p := prefix.Masked() + next := s.next4 + maxBits := 32 if p.Addr().Is6() { next = s.next6 + maxBits = 128 } - // entry already exists? - entry := ROAEntry{MaxLen: maxLen, ASN: asn} + if ml := int(maxLen); ml < prefix.Bits() || ml > maxBits { + s.Warn().Str("prefix", prefix.String()).Int("maxLength", ml).Msg("invalid maxLength, skipping") + return + } + + entry := VRP{MaxLen: maxLen, ASN: asn} i := slices.Index(next[p], entry) - if add { // add iff really novel + if add { if i < 0 { next[p] = append(next[p], entry) } - } else { // drop if really exists + } else { if i >= 0 { next[p] = slices.Delete(next[p], i, i+1) } } } + +func (s *Rpki) nextASPA(add bool, cas uint32, providers []uint32) { + if add { + // normalize: remove zeros, deduplicate, sort for BinarySearch + norm := make([]uint32, 0, len(providers)) + for _, p := range providers { + if p != 0 { + norm = append(norm, p) + } + } + slices.Sort(norm) + norm = slices.Compact(norm) + s.next_aspa[cas] = norm + } else { + delete(s.next_aspa, cas) + } +} diff --git a/stages/rpki/next_test.go b/stages/rpki/next_test.go index 7ba9aac..6938977 100644 --- a/stages/rpki/next_test.go +++ b/stages/rpki/next_test.go @@ -10,10 +10,10 @@ func TestNextAddBasic(t *testing.T) { // Test IPv4 addition p4 := netip.MustParsePrefix("192.0.2.0/24") - s.nextRoa(true, p4, 24, 65001) + s.nextVRP(true, p4, 24, 65001) if len(s.next4) != 1 { - t.Fatalf("expected 1 IPv4 ROA, got %d", len(s.next4)) + t.Fatalf("expected 1 IPv4 VRP, got %d", len(s.next4)) } if entries := s.next4[p4]; len(entries) != 1 { t.Fatalf("expected 1 entry for prefix, got %d", len(entries)) @@ -24,10 +24,10 @@ func TestNextAddBasic(t *testing.T) { // Test IPv6 addition p6 := netip.MustParsePrefix("2001:db8::/32") - s.nextRoa(true, p6, 48, 65002) + s.nextVRP(true, p6, 48, 65002) if len(s.next6) != 1 { - t.Fatalf("expected 1 IPv6 ROA, got %d", len(s.next6)) + t.Fatalf("expected 1 IPv6 VRP, got %d", len(s.next6)) } if entries := s.next6[p6]; len(entries) != 1 { t.Fatalf("expected 1 entry for prefix, got %d", len(entries)) @@ -40,8 +40,8 @@ func TestNextAddDuplicates(t *testing.T) { p := netip.MustParsePrefix("192.0.2.0/24") // Add same VRP twice - s.nextRoa(true, p, 24, 65001) - s.nextRoa(true, p, 24, 65001) + s.nextVRP(true, p, 24, 65001) + s.nextVRP(true, p, 24, 65001) if len(s.next4[p]) != 1 { t.Errorf("expected 1 entry (duplicate ignored), got %d", len(s.next4[p])) @@ -54,9 +54,9 @@ func TestNextAddMultipleOrigins(t *testing.T) { p := netip.MustParsePrefix("192.0.2.0/24") // Same prefix, different ASNs (MOAS scenario) - s.nextRoa(true, p, 24, 65001) - s.nextRoa(true, p, 24, 65002) - s.nextRoa(true, p, 25, 65001) // Same prefix, different maxLen + s.nextVRP(true, p, 24, 65001) + s.nextVRP(true, p, 24, 65002) + s.nextVRP(true, p, 25, 65001) // Same prefix, different maxLen if len(s.next4[p]) != 3 { t.Errorf("expected 3 entries, got %d", len(s.next4[p])) @@ -69,9 +69,9 @@ func TestNextDel(t *testing.T) { p := netip.MustParsePrefix("192.0.2.0/24") // Add then delete - s.nextRoa(true, p, 24, 65001) - s.nextRoa(true, p, 24, 65002) - s.nextRoa(false, p, 24, 65001) + s.nextVRP(true, p, 24, 65001) + s.nextVRP(true, p, 24, 65002) + s.nextVRP(false, p, 24, 65001) entries := s.next4[p] if len(entries) != 1 { @@ -86,10 +86,10 @@ func TestNextDelNonExistent(t *testing.T) { s := newTestRpkiSimple() p := netip.MustParsePrefix("192.0.2.0/24") - s.nextRoa(true, p, 24, 65001) + s.nextVRP(true, p, 24, 65001) // Delete non-existent entry (should be no-op) - s.nextRoa(false, p, 24, 65999) + s.nextVRP(false, p, 24, 65999) if len(s.next4[p]) != 1 { t.Errorf("expected 1 entry (delete ignored), got %d", len(s.next4[p])) @@ -98,24 +98,24 @@ func TestNextDelNonExistent(t *testing.T) { func TestNextApply(t *testing.T) { s := newTestRpki() - s.roa_done = make(chan bool) + s.vrp_done = make(chan bool) - // Add some ROAs - s.nextRoa(true, netip.MustParsePrefix("192.0.2.0/24"), 24, 65001) - s.nextRoa(true, netip.MustParsePrefix("2001:db8::/32"), 48, 65002) + // Add some VRPs + s.nextVRP(true, netip.MustParsePrefix("192.0.2.0/24"), 24, 65001) + s.nextVRP(true, netip.MustParsePrefix("2001:db8::/32"), 48, 65002) // Apply (publishes next -> current) s.nextApply() // Check current caches were updated - roa4 := s.roa4.Load() - roa6 := s.roa6.Load() + v4 := s.vrp4.Load() + v6 := s.vrp6.Load() - if len(*roa4) != 1 { - t.Errorf("expected 1 IPv4 ROA in current, got %d", len(*roa4)) + if len(*v4) != 1 { + t.Errorf("expected 1 IPv4 VRP in current, got %d", len(*v4)) } - if len(*roa6) != 1 { - t.Errorf("expected 1 IPv6 ROA in current, got %d", len(*roa6)) + if len(*v6) != 1 { + t.Errorf("expected 1 IPv6 VRP in current, got %d", len(*v6)) } // Check next was cloned (for incremental updates) @@ -125,12 +125,42 @@ func TestNextApply(t *testing.T) { } } +func TestNextVRP_InvalidMaxLength(t *testing.T) { + s := newTestRpki() + + // maxLen=33 exceeds IPv4 max of 32 → should be rejected + p4 := netip.MustParsePrefix("192.0.2.0/24") + s.nextVRP(true, p4, 33, 65001) + if len(s.next4[p4]) != 0 { + t.Error("maxLen=33 should be rejected for IPv4") + } + + // maxLen=129 exceeds IPv6 max of 128 → should be rejected + p6 := netip.MustParsePrefix("2001:db8::/32") + s.nextVRP(true, p6, 129, 65002) + if len(s.next6[p6]) != 0 { + t.Error("maxLen=129 should be rejected for IPv6") + } + + // maxLen < prefix length → should be rejected + s.nextVRP(true, p4, 20, 65001) + if len(s.next4[p4]) != 0 { + t.Error("maxLen < prefix length should be rejected") + } + + // valid maxLen should be accepted + s.nextVRP(true, p4, 32, 65001) + if len(s.next4[p4]) != 1 { + t.Error("maxLen=32 should be accepted for /24 IPv4") + } +} + func TestPrefixMasking(t *testing.T) { s := newTestRpkiSimple() // Add unmasked prefix (should be masked automatically) p := netip.MustParsePrefix("192.0.2.123/24") - s.nextRoa(true, p, 24, 65001) + s.nextVRP(true, p, 24, 65001) // Should be stored as masked prefix masked := netip.MustParsePrefix("192.0.2.0/24") diff --git a/stages/rpki/rpki.go b/stages/rpki/rpki.go index 8e982a9..58c76c1 100644 --- a/stages/rpki/rpki.go +++ b/stages/rpki/rpki.go @@ -11,7 +11,6 @@ import ( "sync/atomic" "time" - rtrlib "github.com/bgp/stayrtr/lib" "github.com/VictoriaMetrics/metrics" "github.com/bgpfix/bgpfix/msg" "github.com/bgpfix/bgpfix/pipe" @@ -20,86 +19,118 @@ import ( ) const ( - minROALenV4 = 8 // No ROAs shorter than /8 for IPv4 - minROALenV6 = 12 // No ROAs shorter than /12 for IPv6 + min_vrp_v4 = 8 // no VRPs shorter than /8 for IPv4 + min_vrp_v6 = 12 // no VRPs shorter than /12 for IPv6 ) -// RPKI validation status +// ROV validation status const ( - rpki_valid = iota // Prefix+origin covered by valid ROA - rpki_invalid // Prefix+origin conflicts with ROA - rpki_not_found // No ROA covers this prefix + rov_valid = iota // prefix+origin covered by valid VRP + rov_invalid // prefix+origin conflicts with VRP + rov_not_found // no VRP covers this prefix ) -// what to do with invalid prefixes +// ASPA path validation status const ( - rpki_withdraw = iota // Move invalid prefixes to withdrawn (RFC 7606) - rpki_drop // Drop entire UPDATE if any prefix invalid - rpki_filter // Remove invalid prefixes from the reachable prefixes - rpki_split // Split invalid prefixes into separate UPDATE withdrawing them - rpki_keep // Keep invalid prefixes unchanged + aspa_valid = iota // path is valley-free and fully attested + aspa_unknown // insufficient attestation + aspa_invalid // proven route leak ) -// ROAEntry represents a single VRP (Validated ROA Payload) -type ROAEntry struct { +// action for invalid prefixes/paths +const ( + act_withdraw = iota // move invalid prefixes to withdrawn + act_drop // drop entire UPDATE message + act_filter // remove invalid prefixes silently (ROV only) + act_split // split invalid prefixes to separate UPDATE (ROV only) + act_keep // keep unchanged (tag only) +) + +// VRP represents a single Validated ROA Payload +type VRP struct { MaxLen uint8 ASN uint32 } -// ROA maps prefixes to lists of ROA entries -type ROA = map[netip.Prefix][]ROAEntry +// VRPs maps prefixes to lists of VRP entries +type VRPs = map[netip.Prefix][]VRP + +// ASPA maps Customer ASN to its list of Provider ASNs +type ASPA = map[uint32][]uint32 -// Rpki is a stage that validates BGP UPDATE messages using RPKI data +// Rpki validates BGP UPDATE messages using RPKI data (ROV + ASPA) type Rpki struct { *core.StageBase - in_split *pipe.Input // used for --invalid=split + split *pipe.Input // used for --invalid=split - // config + // ROV config rtr string file string - invalid int + rov_act int strict bool tag bool event string - // current ROA cache - roa_done chan bool // is ROA cache ready for use? - roa4 atomic.Pointer[ROA] // current IPv4 ROA - roa6 atomic.Pointer[ROA] // current IPv6 ROA - next4 ROA // next roa4 (pending apply) - next6 ROA // next roa6 (pending apply) + // ASPA config (requires --aspa) + aspa_on bool // true if --aspa flag is set + aspa_act int // action for ASPA INVALID paths + aspa_tag bool // add aspa/status tag + aspa_ev string // emit event on ASPA INVALID + aspa_role string // --aspa-role flag value + + // resolved peer role (per-direction, set once on first UPDATE per dir) + peer_role [2]int // caps.ROLE_* constant; -1 = unresolved + peer_role_mu [2]sync.Once + peer_role_ok [2]bool // true if resolved successfully + peer_down [2]bool // true if peer is provider/RS (downstream path) + + // VRP cache (current = atomic pointer; next = pending) + vrp_done chan bool + vrp4 atomic.Pointer[VRPs] + vrp6 atomic.Pointer[VRPs] + next4 VRPs + next6 VRPs + + // ASPA cache + aspa atomic.Pointer[ASPA] + next_aspa ASPA // prometheus metrics - cMessages *metrics.Counter // bgpipe_rpki_messages_total - cValid *metrics.Counter // bgpipe_rpki_valid_total - cInvalid *metrics.Counter // bgpipe_rpki_invalid_total - cNotFound *metrics.Counter // bgpipe_rpki_not_found_total + cnt_msg *metrics.Counter + cnt_rov_valid *metrics.Counter + cnt_rov_inv *metrics.Counter + cnt_rov_nf *metrics.Counter + cnt_aspa_valid *metrics.Counter + cnt_aspa_unk *metrics.Counter + cnt_aspa_inv *metrics.Counter // file watcher state - file_mod time.Time // last modification time - file_hash [32]byte // last file hash + file_mod time.Time + file_hash [32]byte - // RTR client state + // RTR client state (protected by rtr_mu) rtr_mu sync.Mutex - rtr_conn net.Conn // RTR connection - rtr_client *rtrlib.ClientSession // RTR client - rtr_sessid uint16 // last session ID from server - rtr_serial uint32 // last serial number from server - rtr_valid bool // true if we have a valid serial to use + rtr_conn net.Conn + rtr_valid bool + rtr_serial uint32 // last applied serial + rtr_sessid uint16 // last applied session ID + rtr_has bool // true once first EndOfData received } func NewRpki(parent *core.StageBase) core.Stage { s := &Rpki{ StageBase: parent, - roa_done: make(chan bool), + vrp_done: make(chan bool), + peer_role: [2]int{-1, -1}, } - s.roa4.Store(new(ROA)) - s.roa6.Store(new(ROA)) + s.vrp4.Store(new(VRPs)) + s.vrp6.Store(new(VRPs)) + s.aspa.Store(new(ASPA)) s.nextFlush() o := &s.Options - o.Descr = "validate UPDATEs using RPKI" + o.Descr = "validate UPDATEs using RPKI (ROV; use --aspa for ASPA)" o.FilterIn = true o.Bidir = true @@ -112,13 +143,18 @@ func NewRpki(parent *core.StageBase) core.Stage { f.Int("retry-max", 0, "maximum number of connection retries (0 means unlimited)") f.Bool("tls", false, "connect over TLS") f.Bool("insecure", false, "do not validate TLS certificates") - f.Bool("no-ipv6", false, "avoid IPv6 if possible") - f.String("file", "", "use a ROA file instead of RTR (JSON/CSV, auto-reloaded)") - f.String("invalid", "withdraw", "action for INVALID prefixes: withdraw|filter|drop|split|keep") + f.Bool("no-ipv6", false, "avoid IPv6 for RTR server connection") + f.String("file", "", "use a VRP/ASPA file instead of RTR (JSON/CSV, auto-reloaded)") + f.String("invalid", "withdraw", "action for ROV INVALID: withdraw|filter|drop|split|keep") f.Bool("strict", false, "treat NOT_FOUND same as INVALID") - f.Bool("tag", true, "add RPKI validation status to message tags") - f.String("event", "", "emit event on RPKI INVALID messages") - f.Bool("asap", false, "do not wait for ROA cache to become ready") + f.Bool("tag", true, "add ROV validation status to message tags") + f.String("event", "", "emit event on ROV INVALID messages") + f.Bool("no-wait", false, "start before VRP/ASPA cache is ready") + f.Bool("aspa", false, "enable ASPA path validation (draft-ietf-sidrops-aspa-verification)") + f.String("aspa-invalid", "withdraw", "action for ASPA INVALID: withdraw|drop|keep") + f.Bool("aspa-tag", true, "add ASPA validation status to message tags") + f.String("aspa-event", "", "emit event on ASPA INVALID paths") + f.String("aspa-role", "auto", "peer BGP role: auto|provider|customer|peer|rs|rs-client") return s } @@ -126,58 +162,90 @@ func NewRpki(parent *core.StageBase) core.Stage { func (s *Rpki) Attach() error { k := s.K - // create prometheus counters and gauges + // prometheus metrics prefix := s.MetricPrefix() - s.cMessages = metrics.GetOrCreateCounter(prefix + "messages_total") - s.cValid = metrics.GetOrCreateCounter(prefix + "valid_total") - s.cInvalid = metrics.GetOrCreateCounter(prefix + "invalid_total") - s.cNotFound = metrics.GetOrCreateCounter(prefix + "not_found_total") - metrics.NewGauge(prefix+"roa4_prefixes", func() float64 { - if r4 := s.roa4.Load(); r4 != nil { - return float64(len(*r4)) + s.cnt_msg = metrics.GetOrCreateCounter(prefix + "messages_total") + s.cnt_rov_valid = metrics.GetOrCreateCounter(prefix + "rov_valid_total") + s.cnt_rov_inv = metrics.GetOrCreateCounter(prefix + "rov_invalid_total") + s.cnt_rov_nf = metrics.GetOrCreateCounter(prefix + "rov_not_found_total") + metrics.NewGauge(prefix+"vrps_ipv4", func() float64 { + if v := s.vrp4.Load(); v != nil { + return float64(len(*v)) } return 0 }) - metrics.NewGauge(prefix+"roa6_prefixes", func() float64 { - if r6 := s.roa6.Load(); r6 != nil { - return float64(len(*r6)) + metrics.NewGauge(prefix+"vrps_ipv6", func() float64 { + if v := s.vrp6.Load(); v != nil { + return float64(len(*v)) } return 0 }) - // Parse invalid action + s.aspa_on = k.Bool("aspa") + if s.aspa_on { + s.cnt_aspa_valid = metrics.GetOrCreateCounter(prefix + "aspa_valid_total") + s.cnt_aspa_unk = metrics.GetOrCreateCounter(prefix + "aspa_unknown_total") + s.cnt_aspa_inv = metrics.GetOrCreateCounter(prefix + "aspa_invalid_total") + metrics.NewGauge(prefix+"aspa_entries", func() float64 { + if a := s.aspa.Load(); a != nil { + return float64(len(*a)) + } + return 0 + }) + } + + // parse ROV action switch strings.ToLower(k.String("invalid")) { case "withdraw": - s.invalid = rpki_withdraw + s.rov_act = act_withdraw case "drop": - s.invalid = rpki_drop + s.rov_act = act_drop case "filter": - s.invalid = rpki_filter + s.rov_act = act_filter case "split": - s.invalid = rpki_split + s.rov_act = act_split case "keep": - s.invalid = rpki_keep + s.rov_act = act_keep default: return fmt.Errorf("--invalid must be withdraw, filter, drop, split or keep") } + // parse ASPA config + if s.aspa_on { + switch strings.ToLower(k.String("aspa-invalid")) { + case "withdraw": + s.aspa_act = act_withdraw + case "drop": + s.aspa_act = act_drop + case "keep": + s.aspa_act = act_keep + default: + return fmt.Errorf("--aspa-invalid must be withdraw, drop or keep") + } + s.aspa_tag = k.Bool("aspa-tag") + s.aspa_ev = k.String("aspa-event") + s.aspa_role = strings.ToLower(k.String("aspa-role")) + if s.aspa_role != "auto" { + if _, ok := parseRoleName(s.aspa_role); !ok { + return fmt.Errorf("--aspa-role must be auto, provider, customer, peer, rs or rs-client") + } + } + } + s.strict = k.Bool("strict") s.rtr = k.String("rtr") s.file = k.String("file") s.tag = k.Bool("tag") s.event = k.String("event") - // need at least one source if s.rtr == "" && s.file == "" { return fmt.Errorf("must specify --rtr or --file") } - // register callback for UPDATE messages s.P.OnMsg(s.validateMsg, s.Dir, msg.UPDATE) - // create input for --invalid=split - if s.invalid == rpki_split { - s.in_split = s.P.AddInput(s.Dir) + if s.rov_act == act_split { + s.split = s.P.AddInput(s.Dir) } return nil @@ -193,10 +261,9 @@ func (s *Rpki) Prepare() error { panic("no RPKI source configured") } - // block until the ROA cache is ready? - if !s.K.Bool("asap") { + if !s.K.Bool("no-wait") { select { - case <-s.roa_done: + case <-s.vrp_done: case <-s.Ctx.Done(): } } @@ -215,31 +282,41 @@ func (s *Rpki) Stop() error { func (s *Rpki) RouteHTTP(r chi.Router) error { r.Get("/", func(w http.ResponseWriter, req *http.Request) { - var roa4size, roa6size int - if r4 := s.roa4.Load(); r4 != nil { - roa4size = len(*r4) - } - if r6 := s.roa6.Load(); r6 != nil { - roa6size = len(*r6) - } - source := "rtr" if s.file != "" { source = "file" } - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(map[string]any{ + met := map[string]uint64{ + "messages": s.cnt_msg.Get(), + "rov_valid": s.cnt_rov_valid.Get(), + "rov_invalid": s.cnt_rov_inv.Get(), + "rov_not_found": s.cnt_rov_nf.Get(), + } + + resp := map[string]any{ "source": source, - "roa4": roa4size, - "roa6": roa6size, - "metrics": map[string]uint64{ - "messages": s.cMessages.Get(), - "valid": s.cValid.Get(), - "invalid": s.cInvalid.Get(), - "not_found": s.cNotFound.Get(), - }, - }) + "metrics": met, + } + + if v := s.vrp4.Load(); v != nil { + resp["vrps_ipv4"] = len(*v) + } + if v := s.vrp6.Load(); v != nil { + resp["vrps_ipv6"] = len(*v) + } + + if s.aspa_on { + if a := s.aspa.Load(); a != nil { + resp["aspa_entries"] = len(*a) + } + met["aspa_valid"] = s.cnt_aspa_valid.Get() + met["aspa_unknown"] = s.cnt_aspa_unk.Get() + met["aspa_invalid"] = s.cnt_aspa_inv.Get() + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) }) return nil } diff --git a/stages/rpki/rtr.go b/stages/rpki/rtr.go index 3de36e8..e3393a8 100644 --- a/stages/rpki/rtr.go +++ b/stages/rpki/rtr.go @@ -1,58 +1,82 @@ package rpki import ( + "net/netip" "time" - rtrlib "github.com/bgp/stayrtr/lib" + "github.com/bgpfix/bgpfix/rtr" "github.com/bgpfix/bgpipe/pkg/util" ) -// rtrRun runs the RTR client with reconnection logic +// rtrRun manages the RTR client connection loop with reconnection. func (s *Rpki) rtrRun() { k := s.K - config := rtrlib.ClientConfiguration{ - ProtocolVersion: rtrlib.PROTOCOL_VERSION_1, - Log: &util.Stdlog{Logger: s.Logger}, - } + client := rtr.NewClient(&rtr.Options{ + Logger: &s.Logger, + Version: rtr.VersionAuto, + + OnROA: func(add bool, prefix netip.Prefix, maxLen uint8, asn uint32) { + s.nextVRP(add, prefix, maxLen, asn) + }, - // start the refresh goroutine - go s.rtrRefresh(k.Duration("rtr-refresh")) + OnASPA: func(add bool, cas uint32, providers []uint32) { + s.nextASPA(add, cas, providers) + }, + + OnEndOfData: func(sessid uint16, serial uint32) { + s.rtr_mu.Lock() + changed := !s.rtr_has || s.rtr_serial != serial || s.rtr_sessid != sessid + s.rtr_serial = serial + s.rtr_sessid = sessid + s.rtr_has = true + s.rtr_valid = true + s.rtr_mu.Unlock() + if changed { + s.nextApply() + } + }, + + OnCacheReset: func() { + s.nextFlush() + s.rtr_mu.Lock() + s.rtr_valid = false + s.rtr_has = false + s.rtr_mu.Unlock() + }, + + OnError: func(code uint16, text string) { + if code != rtr.ErrNoData { + s.Warn().Uint16("code", code).Str("text", text).Msg("RTR error") + } else { + s.Debug().Msg("RTR no data available yet") + } + }, + }) + + go s.rtrRefresh(client, k.Duration("rtr-refresh")) for s.Ctx.Err() == nil { - // NB: measure retry time vs. dial time, to protect from - // retrying too fast if the server keeps dropping us retry := time.Now().Add(k.Duration("rtr-retry")) - // connect conn, err := util.DialRetry(s.StageBase, nil, "tcp", s.rtr) if err != nil { s.Fatal().Err(err).Msg("could not connect to RTR server") } - // make a new state - rc := rtrlib.NewClientSession(config, s) s.rtr_mu.Lock() s.rtr_conn = conn - s.rtr_client = rc - s.rtr_sessid = 0 s.rtr_valid = false - s.nextFlush() s.rtr_mu.Unlock() - // run RTR session (blocks until disconnected) - err = rc.StartWithConn(conn) + s.nextFlush() + err = client.Run(s.Ctx, conn) // NB: Run always closes conn - // clear the state s.rtr_mu.Lock() - s.rtr_conn.Close() - s.rtr_client = nil s.rtr_conn = nil - s.rtr_sessid = 0 s.rtr_valid = false s.rtr_mu.Unlock() - // report, retry if sleep := time.Until(retry); sleep > time.Second { s.Warn().Err(err).Str("addr", s.rtr).Msgf("RTR connection failed, retrying in %s", sleep.Round(time.Second)) select { @@ -65,28 +89,8 @@ func (s *Rpki) rtrRun() { } } -// rtrSessionCheck checks if the session ID has changed -func (s *Rpki) rtrSessionCheck(rc *rtrlib.ClientSession, id uint16) bool { - if s.rtr_valid && s.rtr_sessid == id { - return true - } - - if !s.rtr_valid && s.rtr_sessid == 0 { - s.Info().Uint16("id", id).Msg("RTR new session") - return true - } - - s.Info().Uint16("old", s.rtr_sessid).Uint16("new", id).Msg("RTR session changed") - s.rtr_sessid = id - s.rtr_valid = false - s.nextFlush() - rc.SendResetQuery() - - return false -} - -// rtrRefresh sends periodic Serial Query to check for updates -func (s *Rpki) rtrRefresh(interval time.Duration) { +// rtrRefresh sends periodic Serial Query to check for incremental updates. +func (s *Rpki) rtrRefresh(client *rtr.Client, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -94,101 +98,14 @@ func (s *Rpki) rtrRefresh(interval time.Duration) { select { case <-ticker.C: s.rtr_mu.Lock() - rs := s.rtr_client valid := s.rtr_valid - sessid := s.rtr_sessid - serial := s.rtr_serial s.rtr_mu.Unlock() - - if rs != nil && valid { - s.Debug(). - Uint16("session", sessid). - Uint32("serial", serial).Msg("RTR periodic refresh") - rs.SendSerialQuery(sessid, serial) + if valid { + s.Debug().Msg("RTR periodic refresh") + client.SendSerial() } case <-s.Ctx.Done(): return } } } - -// HandlePDU implements rtrlib.RTRClientSessionEventHandler -// It is called serially from the RTR client goroutine (no concurrency issues). -func (s *Rpki) HandlePDU(rc *rtrlib.ClientSession, pdu rtrlib.PDU) { - switch p := pdu.(type) { - case *rtrlib.PDUIPv4Prefix: - s.nextRoa(p.Flags == rtrlib.FLAG_ADDED, p.Prefix, p.MaxLen, p.ASN) - case *rtrlib.PDUIPv6Prefix: - s.nextRoa(p.Flags == rtrlib.FLAG_ADDED, p.Prefix, p.MaxLen, p.ASN) - - case *rtrlib.PDUEndOfData: - s.Debug(). - Uint16("session", p.SessionId). - Uint32("serial", p.SerialNumber).Msg("RTR end of data") - s.rtr_mu.Lock() - defer s.rtr_mu.Unlock() - - if s.rtr_valid && s.rtr_serial == p.SerialNumber { - return // no change - } - - s.rtr_sessid = p.SessionId - s.rtr_serial = p.SerialNumber - s.rtr_valid = true - s.nextApply() - - case *rtrlib.PDUCacheReset: - s.Info().Msg("RTR cache reset requested") - s.rtr_mu.Lock() - defer s.rtr_mu.Unlock() - - s.rtr_valid = false - s.nextFlush() - rc.SendResetQuery() - - case *rtrlib.PDUCacheResponse: - s.Debug().Uint16("session", p.SessionId).Msg("RTR cache response") - s.rtr_mu.Lock() - defer s.rtr_mu.Unlock() - - s.rtrSessionCheck(rc, p.SessionId) - - case *rtrlib.PDUSerialNotify: - s.Debug(). - Uint16("session", p.SessionId). - Uint32("serial", p.SerialNumber).Msg("RTR serial notify") - s.rtr_mu.Lock() - defer s.rtr_mu.Unlock() - - if !s.rtrSessionCheck(rc, p.SessionId) { - return // session changed, reset already sent - } else if p.SerialNumber != s.rtr_serial { - rc.SendSerialQuery(s.rtr_sessid, s.rtr_serial) - } - - case *rtrlib.PDUErrorReport: - s.Warn().Uint16("code", p.ErrorCode).Str("text", p.ErrorMsg).Msg("RTR error") - s.rtr_mu.Lock() - defer s.rtr_mu.Unlock() - - s.rtr_valid = false - s.nextFlush() - - // code 2 = "No Data Available" (eg. server still initializing); - // do not retry immediately — wait for the periodic refresh instead - if p.ErrorCode != rtrlib.PDU_ERROR_NODATA { - rc.SendResetQuery() - } - } -} - -// ClientConnected implements rtrlib.RTRClientSessionEventHandler -func (s *Rpki) ClientConnected(rc *rtrlib.ClientSession) { - s.Debug().Msg("RTR connected, requesting full cache") - rc.SendResetQuery() -} - -// ClientDisconnected implements rtrlib.RTRClientSessionEventHandler -func (s *Rpki) ClientDisconnected(rc *rtrlib.ClientSession) { - s.Debug().Msg("RTR disconnected") -} diff --git a/stages/rpki/validate.go b/stages/rpki/validate.go index 4b94ad6..95d1186 100644 --- a/stages/rpki/validate.go +++ b/stages/rpki/validate.go @@ -11,114 +11,104 @@ import ( "github.com/bgpfix/bgpfix/pipe" ) -// validatePrefix performs RPKI validation for a single prefix -func (s *Rpki) validatePrefix(roa4, roa6 ROA, p netip.Prefix, origin uint32) int { - // pick ROA cache - var roas ROA +// validatePrefix performs ROV for a single prefix against VRP caches. +func (s *Rpki) validatePrefix(v4, v6 VRPs, p netip.Prefix, origin uint32) int { + var vrps VRPs var minLen int if p.Addr().Is4() { - minLen = minROALenV4 - roas = roa4 + minLen = min_vrp_v4 + vrps = v4 } else { - minLen = minROALenV6 - roas = roa6 + minLen = min_vrp_v6 + vrps = v6 } - if len(roas) == 0 { + if len(vrps) == 0 { if s.strict { - return rpki_invalid + return rov_invalid } - return rpki_not_found + return rov_not_found } - // find covering prefixes from most- to least-specific + // find covering VRPs from most- to least-specific var found bool addr, bits := p.Addr(), uint8(p.Bits()) for try := p.Bits(); try >= minLen; try-- { p, _ := addr.Prefix(try) - for _, e := range roas[p] { + for _, e := range vrps[p] { if origin != 0 && origin == e.ASN && bits <= e.MaxLen { - return rpki_valid - } else { - found = true + return rov_valid } + found = true } } if found { - return rpki_invalid + return rov_invalid } else if s.strict { - return rpki_invalid - } else { - return rpki_not_found + return rov_invalid } + return rov_not_found } -// validateMsg is the callback for UPDATE messages +// validateMsg is the callback for UPDATE messages. func (s *Rpki) validateMsg(m *msg.Msg) bool { - s.cMessages.Inc() + s.cnt_msg.Inc() u := &m.Update tags := pipe.UseTags(m) - // get current ROA caches - roa4, roa6 := *s.roa4.Load(), *s.roa6.Load() + // current VRP caches + v4, v6 := *s.vrp4.Load(), *s.vrp6.Load() - // get origin AS from AS_PATH + // origin AS from AS_PATH origin := u.AsPath().Origin() - // check_delete checks a prefix and decides whether to delete it + // check each reachable prefix, optionally deleting invalid ones var valid, invalid, not_found []nlri.Prefix - invalid_delete := s.invalid == rpki_withdraw || s.invalid == rpki_filter || s.invalid == rpki_split - check_delete := func(p nlri.Prefix) bool { - switch s.validatePrefix(roa4, roa6, p.Prefix, origin) { - case rpki_valid: - s.cValid.Inc() + do_delete := s.rov_act == act_withdraw || s.rov_act == act_filter || s.rov_act == act_split + check := func(p nlri.Prefix) bool { + switch s.validatePrefix(v4, v6, p.Prefix, origin) { + case rov_valid: + s.cnt_rov_valid.Inc() valid = append(valid, p) if s.tag { tags["rpki/"+p.String()] = "VALID" } - return false // keep prefix + return false - case rpki_not_found: - s.cNotFound.Inc() + case rov_not_found: + s.cnt_rov_nf.Inc() not_found = append(not_found, p) if s.tag { tags["rpki/"+p.String()] = "NOT_FOUND" } - return false // keep prefix + return false - case rpki_invalid: - s.cInvalid.Inc() + case rov_invalid: + s.cnt_rov_inv.Inc() invalid = append(invalid, p) - return invalid_delete // drop prefix iff requested + return do_delete } panic("unreachable") } - // check IPv4 reachable prefixes - u.Reach = slices.DeleteFunc(u.Reach, check_delete) - - // check MP reachable prefixes - mpp := u.ReachMP().Prefixes() - if mpp != nil && mpp.Len() > 0 { - mpp.Prefixes = slices.DeleteFunc(mpp.Prefixes, check_delete) + u.Reach = slices.DeleteFunc(u.Reach, check) + if mpp := u.ReachMP().Prefixes(); mpp != nil && mpp.Len() > 0 { + mpp.Prefixes = slices.DeleteFunc(mpp.Prefixes, check) } - // act based on validation results + // act on ROV results if len(invalid) > 0 { - // message (will be) modified? - if s.tag || s.invalid != rpki_keep { + if s.tag || s.rov_act != act_keep { m.Edit() } - // split into separate message? - do_split := s.invalid == rpki_split && len(valid)+len(not_found) > 0 - m2 := m // otherwise just edit the original - t2 := tags + // split invalid prefixes into separate UPDATE? + do_split := s.rov_act == act_split && len(valid)+len(not_found) > 0 + m2, t2 := m, tags if do_split { m2 = s.P.GetMsg().Switch(msg.UPDATE) m2.Time = m.Time - t2 = pipe.UseTags(m2) for k, v := range tags { if !strings.HasPrefix(k, "rpki/") { @@ -127,7 +117,6 @@ func (s *Rpki) validateMsg(m *msg.Msg) bool { } } - // add RPKI tags? if s.tag { t2["rpki/status"] = "INVALID" for _, p := range invalid { @@ -135,46 +124,53 @@ func (s *Rpki) validateMsg(m *msg.Msg) bool { } } - // rewrite invalid prefixes to unreach? - if s.invalid == rpki_split || s.invalid == rpki_withdraw { + if s.rov_act == act_split || s.rov_act == act_withdraw { m2.Update.AddUnreach(invalid...) } - // drop attributes if no reachable prefixes left? - if invalid_delete && len(valid)+len(not_found) == 0 { + // drop attributes if no reachable prefixes left + if do_delete && len(valid)+len(not_found) == 0 { m2.Update.Attrs.Filter(attrs.ATTR_MP_UNREACH) } - // send an event? if s.event != "" { s.Event(s.event, m2) } - // drop the message? - if s.invalid == rpki_drop { + if s.rov_act == act_drop { return false - } else if s.invalid == rpki_keep { - return true - } else if do_split { - s.in_split.WriteMsg(m2) - // NB: original message will continue below - } else { - return u.HasReach() || u.HasUnreach() } - } - // if we're here, m does not contain invalid prefixes - if s.tag { + if do_split { + s.split.WriteMsg(m2) + // tag the original (valid/not-found) message so downstream filters work + if s.tag { + switch { + case len(valid) > 0: + tags["rpki/status"] = "VALID" + case len(not_found) > 0: + tags["rpki/status"] = "NOT_FOUND" + } + } + } + + if s.rov_act != act_keep && !do_split && !u.HasReach() && !u.HasUnreach() { + return false + } + } else if s.tag { switch { case len(not_found) > 0: tags["rpki/status"] = "NOT_FOUND" m.Edit() - case len(valid) > 0: tags["rpki/status"] = "VALID" m.Edit() } } + // ASPA validation (independent of ROV, requires --aspa) + if s.aspa_on { + return s.validateAspa(m, u, tags) + } return true } diff --git a/stages/rpki/validate_test.go b/stages/rpki/validate_test.go index dcc39cd..62c28aa 100644 --- a/stages/rpki/validate_test.go +++ b/stages/rpki/validate_test.go @@ -8,9 +8,9 @@ import ( func TestValidatePrefixExactMatch(t *testing.T) { s := &Rpki{} - // Setup ROA: 192.0.2.0/24-24 AS65001 - roa4 := make(ROA) - roa4[netip.MustParsePrefix("192.0.2.0/24")] = []ROAEntry{ + // VRP: 192.0.2.0/24-24 AS65001 + roa4 := make(VRPs) + roa4[netip.MustParsePrefix("192.0.2.0/24")] = []VRP{ {MaxLen: 24, ASN: 65001}, } @@ -20,9 +20,9 @@ func TestValidatePrefixExactMatch(t *testing.T) { origin uint32 want int }{ - {"exact match valid", "192.0.2.0/24", 65001, rpki_valid}, - {"exact match wrong ASN", "192.0.2.0/24", 65002, rpki_invalid}, - {"no ROA", "203.0.113.0/24", 65001, rpki_not_found}, + {"exact match valid", "192.0.2.0/24", 65001, rov_valid}, + {"exact match wrong ASN", "192.0.2.0/24", 65002, rov_invalid}, + {"no VRP", "203.0.113.0/24", 65001, rov_not_found}, } for _, tt := range tests { @@ -39,9 +39,9 @@ func TestValidatePrefixExactMatch(t *testing.T) { func TestValidatePrefixMaxLen(t *testing.T) { s := &Rpki{} - // Setup ROA: 192.0.2.0/24-26 AS65001 (allows up to /26) - roa4 := make(ROA) - roa4[netip.MustParsePrefix("192.0.2.0/24")] = []ROAEntry{ + // VRP: 192.0.2.0/24-26 AS65001 (allows up to /26) + roa4 := make(VRPs) + roa4[netip.MustParsePrefix("192.0.2.0/24")] = []VRP{ {MaxLen: 26, ASN: 65001}, } @@ -51,11 +51,11 @@ func TestValidatePrefixMaxLen(t *testing.T) { origin uint32 want int }{ - {"within maxLen /24", "192.0.2.0/24", 65001, rpki_valid}, - {"within maxLen /25", "192.0.2.0/25", 65001, rpki_valid}, - {"within maxLen /26", "192.0.2.0/26", 65001, rpki_valid}, - {"exceeds maxLen /27", "192.0.2.0/27", 65001, rpki_invalid}, - {"exceeds maxLen /28", "192.0.2.0/28", 65001, rpki_invalid}, + {"within maxLen /24", "192.0.2.0/24", 65001, rov_valid}, + {"within maxLen /25", "192.0.2.0/25", 65001, rov_valid}, + {"within maxLen /26", "192.0.2.0/26", 65001, rov_valid}, + {"exceeds maxLen /27", "192.0.2.0/27", 65001, rov_invalid}, + {"exceeds maxLen /28", "192.0.2.0/28", 65001, rov_invalid}, } for _, tt := range tests { @@ -73,10 +73,10 @@ func TestValidatePrefixCoveringROA(t *testing.T) { s := &Rpki{} s.nextFlush() // Initialize next4/next6 maps - // Setup ROA: 192.0.2.0/22-24 AS65001 (covers /22, /23, /24) - roa4 := make(ROA) + // VRP: 192.0.2.0/22-24 AS65001 (covers /22, /23, /24) + roa4 := make(VRPs) // Must use .Masked() to match how ROAs are stored in nextAdd() - roa4[netip.MustParsePrefix("192.0.2.0/22").Masked()] = []ROAEntry{ + roa4[netip.MustParsePrefix("192.0.2.0/22").Masked()] = []VRP{ {MaxLen: 24, ASN: 65001}, } @@ -86,13 +86,13 @@ func TestValidatePrefixCoveringROA(t *testing.T) { origin uint32 want int }{ - {"covered /22 valid", "192.0.2.0/22", 65001, rpki_valid}, - {"covered /23 valid", "192.0.2.0/23", 65001, rpki_valid}, - {"covered /24 valid", "192.0.2.0/24", 65001, rpki_valid}, - {"covered /24 different subnet valid", "192.0.3.0/24", 65001, rpki_valid}, - {"exceeds maxLen /25", "192.0.2.0/25", 65001, rpki_invalid}, - {"covered wrong ASN", "192.0.2.0/23", 65002, rpki_invalid}, - {"outside range", "192.0.6.0/24", 65001, rpki_not_found}, + {"covered /22 valid", "192.0.2.0/22", 65001, rov_valid}, + {"covered /23 valid", "192.0.2.0/23", 65001, rov_valid}, + {"covered /24 valid", "192.0.2.0/24", 65001, rov_valid}, + {"covered /24 different subnet valid", "192.0.3.0/24", 65001, rov_valid}, + {"exceeds maxLen /25", "192.0.2.0/25", 65001, rov_invalid}, + {"covered wrong ASN", "192.0.2.0/23", 65002, rov_invalid}, + {"outside range", "192.0.6.0/24", 65001, rov_not_found}, } for _, tt := range tests { @@ -109,9 +109,9 @@ func TestValidatePrefixCoveringROA(t *testing.T) { func TestValidatePrefixIPv6(t *testing.T) { s := &Rpki{} - // Setup ROA: 2001:db8::/32-48 AS65001 - roa6 := make(ROA) - roa6[netip.MustParsePrefix("2001:db8::/32")] = []ROAEntry{ + // VRP: 2001:db8::/32-48 AS65001 + roa6 := make(VRPs) + roa6[netip.MustParsePrefix("2001:db8::/32")] = []VRP{ {MaxLen: 48, ASN: 65001}, } @@ -121,11 +121,11 @@ func TestValidatePrefixIPv6(t *testing.T) { origin uint32 want int }{ - {"exact match", "2001:db8::/32", 65001, rpki_valid}, - {"covered /48", "2001:db8:1234::/48", 65001, rpki_valid}, - {"exceeds maxLen /64", "2001:db8:1234:5678::/64", 65001, rpki_invalid}, - {"wrong ASN", "2001:db8::/32", 65002, rpki_invalid}, - {"different prefix", "2001:db9::/32", 65001, rpki_not_found}, + {"exact match", "2001:db8::/32", 65001, rov_valid}, + {"covered /48", "2001:db8:1234::/48", 65001, rov_valid}, + {"exceeds maxLen /64", "2001:db8:1234:5678::/64", 65001, rov_invalid}, + {"wrong ASN", "2001:db8::/32", 65002, rov_invalid}, + {"different prefix", "2001:db9::/32", 65001, rov_not_found}, } for _, tt := range tests { @@ -143,8 +143,8 @@ func TestValidatePrefixMultipleROAs(t *testing.T) { s := &Rpki{} // Setup: Multiple ROAs for same prefix (MOAS scenario) - roa4 := make(ROA) - roa4[netip.MustParsePrefix("192.0.2.0/24")] = []ROAEntry{ + roa4 := make(VRPs) + roa4[netip.MustParsePrefix("192.0.2.0/24")] = []VRP{ {MaxLen: 24, ASN: 65001}, {MaxLen: 26, ASN: 65002}, {MaxLen: 24, ASN: 65003}, @@ -156,12 +156,12 @@ func TestValidatePrefixMultipleROAs(t *testing.T) { origin uint32 want int }{ - {"match AS65001", "192.0.2.0/24", 65001, rpki_valid}, - {"match AS65002 /24", "192.0.2.0/24", 65002, rpki_valid}, - {"match AS65002 /26", "192.0.2.0/26", 65002, rpki_valid}, - {"AS65001 exceeds maxLen", "192.0.2.0/25", 65001, rpki_invalid}, - {"AS65003 /24", "192.0.2.0/24", 65003, rpki_valid}, - {"no matching ASN", "192.0.2.0/24", 65999, rpki_invalid}, + {"match AS65001", "192.0.2.0/24", 65001, rov_valid}, + {"match AS65002 /24", "192.0.2.0/24", 65002, rov_valid}, + {"match AS65002 /26", "192.0.2.0/26", 65002, rov_valid}, + {"AS65001 exceeds maxLen", "192.0.2.0/25", 65001, rov_invalid}, + {"AS65003 /24", "192.0.2.0/24", 65003, rov_valid}, + {"no matching ASN", "192.0.2.0/24", 65999, rov_invalid}, } for _, tt := range tests { @@ -178,24 +178,24 @@ func TestValidatePrefixMultipleROAs(t *testing.T) { func TestValidatePrefixStrictMode(t *testing.T) { s := &Rpki{strict: true} - // Empty ROA cache - roa4 := make(ROA) + // empty VRP cache + roa4 := make(VRPs) p := netip.MustParsePrefix("192.0.2.0/24") got := s.validatePrefix(roa4, nil, p, 65001) // In strict mode, NOT_FOUND should return INVALID - if got != rpki_invalid { - t.Errorf("strict mode: got %d, want rpki_invalid", got) + if got != rov_invalid { + t.Errorf("strict mode: got %d, want rov_invalid", got) } } func TestValidatePrefixMinROALen(t *testing.T) { s := &Rpki{} - // ROA for /7 (too short, below minROALenV4) - roa4 := make(ROA) - roa4[netip.MustParsePrefix("128.0.0.0/7")] = []ROAEntry{ + // VRP for /7 (too short, below min_vrp_v4) + roa4 := make(VRPs) + roa4[netip.MustParsePrefix("128.0.0.0/7")] = []VRP{ {MaxLen: 24, ASN: 65001}, } @@ -203,7 +203,7 @@ func TestValidatePrefixMinROALen(t *testing.T) { p := netip.MustParsePrefix("128.1.0.0/24") got := s.validatePrefix(roa4, nil, p, 65001) - if got != rpki_not_found { + if got != rov_not_found { t.Errorf("should not check beyond minROALenV4, got %d", got) } } @@ -214,12 +214,12 @@ func TestValidatePrefixEmptyCache(t *testing.T) { // Nil and empty caches tests := []struct { name string - roa4 ROA - roa6 ROA + roa4 VRPs + roa6 VRPs }{ {"nil cache", nil, nil}, - {"empty map v4", ROA{}, nil}, - {"empty map v6", nil, ROA{}}, + {"empty map v4", VRPs{}, nil}, + {"empty map v6", nil, VRPs{}}, } for _, tt := range tests { @@ -230,7 +230,7 @@ func TestValidatePrefixEmptyCache(t *testing.T) { got4 := s.validatePrefix(tt.roa4, nil, p4, 65001) got6 := s.validatePrefix(nil, tt.roa6, p6, 65001) - if got4 != rpki_not_found || got6 != rpki_not_found { + if got4 != rov_not_found || got6 != rov_not_found { t.Errorf("empty cache should return NOT_FOUND") } }) diff --git a/stages/speaker.go b/stages/speaker.go index a346493..59e0811 100644 --- a/stages/speaker.go +++ b/stages/speaker.go @@ -25,6 +25,8 @@ func NewSpeaker(parent *core.StageBase) core.Stage { o.Flags.Int("asn", do.LocalASN, "local ASN, -1 means use remote ASN") o.Flags.String("id", "", "local router ID, empty means use remote-1") o.Flags.Int("hold", do.LocalHoldTime, "hold time") + o.Flags.Int("remote-asn", do.RemoteASN, "expected remote ASN, -1 means accept any") + o.Flags.Int("remote-hold", do.RemoteHoldTime, "minimum acceptable remote hold time (s)") return s } @@ -39,6 +41,8 @@ func (s *Speaker) Attach() error { so.Passive = !k.Bool("active") so.LocalASN = k.Int("asn") so.LocalHoldTime = k.Int("hold") + so.RemoteASN = k.Int("remote-asn") + so.RemoteHoldTime = k.Int("remote-hold") lid := k.String("id") if len(lid) > 0 { diff --git a/stages/websocket.go b/stages/websocket.go index 75cca33..cb7b900 100644 --- a/stages/websocket.go +++ b/stages/websocket.go @@ -3,6 +3,7 @@ package stages import ( "bytes" "context" + "crypto/subtle" "crypto/tls" "encoding/base64" "errors" @@ -33,8 +34,8 @@ type Websocket struct { tls *tls.Config // TLS config (may be nil) headers http.Header // HTTP headers - url url.URL // URL address - srv *http.Server // http server (may be nil) + url url.URL // URL address + srv *http.Server // http server (may be nil) client_conn *websocket.Conn // websocket client conn server_conn chan *websocket.Conn // websocket server conns @@ -142,6 +143,7 @@ func (s *Websocket) Attach() error { } cred = make([]byte, 128) n, err := fh.Read(cred) + fh.Close() if err != nil { return fmt.Errorf("--auth: file %s: %w", v, err) } @@ -189,6 +191,7 @@ func (s *Websocket) prepareClient() error { s.Info(). Interface("headers", resp.Header). Msgf("connected %s -> %s", conn.LocalAddr(), conn.RemoteAddr()) + conn.SetReadLimit(65535) s.client_conn = conn return nil // success } else if !s.retry || (s.retry_max > 0 && try >= s.retry_max) { @@ -218,11 +221,11 @@ func (s *Websocket) prepareServer() error { // prepare listener s.srv = &http.Server{ - // TODO: ErrorLog? - Handler: mux, - Addr: s.url.Host, - BaseContext: func(l net.Listener) context.Context { return s.Ctx }, - TLSConfig: s.tls, + Handler: mux, + Addr: s.url.Host, + BaseContext: func(l net.Listener) context.Context { return s.Ctx }, + TLSConfig: s.tls, + ReadHeaderTimeout: 5 * time.Second, } // ok go! @@ -250,7 +253,7 @@ func (s *Websocket) serverHandle(w http.ResponseWriter, r *http.Request) { // require authorization? if auth := headers.Get("Authorization"); len(auth) > 0 { - if r.Header.Get("Authorization") != auth { + if subtle.ConstantTimeCompare([]byte(r.Header.Get("Authorization")), []byte(auth)) != 1 { s.Warn().Msgf("%s: not authorized", r.RemoteAddr) w.Header().Set("WWW-Authenticate", `Basic realm="bgpipe"`) http.Error(w, "Unauthorized", http.StatusUnauthorized) @@ -265,6 +268,17 @@ func (s *Websocket) serverHandle(w http.ResponseWriter, r *http.Request) { // websocket upgrader upgrader := &websocket.Upgrader{ HandshakeTimeout: s.timeout, + CheckOrigin: func(r *http.Request) bool { + origin := r.Header.Get("Origin") + if origin == "" { + return true + } + u, err := url.Parse(origin) + if err != nil { + return false + } + return strings.EqualFold(u.Host, r.Host) + }, } conn, err := upgrader.Upgrade(w, r, headers) if err != nil { @@ -274,6 +288,8 @@ func (s *Websocket) serverHandle(w http.ResponseWriter, r *http.Request) { s.Info().Msgf("%s: connected", r.RemoteAddr) } + conn.SetReadLimit(65535) + // publish conn for broadcasts + signal to connWriter if !util.Send(s.server_conn, conn) || !util.Send(s.eio.Output, nil) { s.Warn().Msgf("%s: could not register new connection", r.RemoteAddr)