diff --git a/AGENTS.md b/AGENTS.md index 619a252..ae33c07 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -157,7 +157,7 @@ Do not assume older docs mentioning only a subset are current. - `claw-api: self` on a service's `x-claw` block is an authority signal — it auto-generates a read-only bearer token scoped to that service and injects `CLAW_API_URL` + `CLAW_API_TOKEN`. This is separate from `surfaces: [service://claw-api]` which only grants network reachability. Both are needed for full access. - claw-api principal scopes have four dimensions: `pods`, `services` (base service names), `claw_ids` (ordinal IDs), `compose_services` (compose service names). `compose_services` is reserved for write-plane ordinal targeting (#78). Write verb constants (`fleet.restart` etc.) are defined but handlers are not yet implemented. - `sequential-conformance: true` at pod level allows services to share the same Discord handle ID (rollcall pattern). Without it, duplicate handle IDs across services are a hard error. The `count > 1` rejection is still enforced even in sequential-conformance pods. -- `claw-wall` is an infrastructure service auto-injected by `claw up` when any cllama-enabled service has Discord channel IDs. The service name `claw-wall` is reserved — declaring it in `claw-pod.yml` is a hard error. Credentials are passed as `CLAW_WALL_TOKENS=channelID:token,...` pairs (not a map); the same channel ID can appear with different tokens for multi-bot pods. Cursor state is in-memory and resets on wall restart — agents may see some message overlap after redeploy. +- `claw-wall` is an infrastructure service auto-injected by `claw up` when any cllama-enabled service has Discord channel IDs. The service name `claw-wall` is reserved — declaring it in `claw-pod.yml` is a hard error. Credentials are passed as `CLAW_WALL_TOKENS=channelID:token,...` pairs (not a map), with exactly one reader token selected per consumed channel. Cursor state is in-memory and resets on wall restart — agents may see some message overlap after redeploy. - Verb validation happens at parse time — unknown verbs in `x-claw.principals` or `principals.json` fail hard. - `claw.describe` is the structured service descriptor label. `claw up` extracts `.claw-describe.json` from images (or falls back to the build context filesystem). Descriptors declare feeds, endpoints, auth, and skill file paths. Feed names from descriptors populate a pod-global feed registry; consumers subscribe by name via short-form `feeds: [name]`. - Feed resolution is two-phase: the parser stores short-form feed names as `FeedEntry{Unresolved: true}` (no source/path validation). `claw up` resolves them after image inspection against the feed registry. Unresolved feeds that aren't in the registry are hard errors. diff --git a/cmd/claw-wall/discord.go b/cmd/claw-wall/discord.go index 83093d1..66244ba 100644 --- a/cmd/claw-wall/discord.go +++ b/cmd/claw-wall/discord.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -22,6 +23,9 @@ type discordPoller struct { targets []tokenPair fetchLimit int latestByPair map[string]string + baseURL string + cooldowns *rateLimitTracker + now func() time.Time } type discordAPIMessage struct { @@ -104,6 +108,9 @@ func newDiscordPoller(client *http.Client, store *conversationStore, targets []t targets: targets, fetchLimit: fetchLimit, latestByPair: make(map[string]string), + baseURL: "https://discord.com/api/v10", + cooldowns: newRateLimitTracker(), + now: time.Now, } } @@ -124,9 +131,20 @@ func (p *discordPoller) Run(ctx context.Context, interval time.Duration, logWrit func (p *discordPoller) pollOnce(ctx context.Context, logWriter io.Writer) { for _, target := range p.targets { + if p.cooldowns.blocked(target, p.now()) { + continue + } + latestID := p.latestByPair[pairKey(target)] messages, newestID, err := p.fetchMessages(ctx, target, latestID) if err != nil { + var rateLimitErr *discordRateLimitError + if errors.As(err, &rateLimitErr) { + if logWriter != nil && rateLimitErr.FirstOccurrence { + fmt.Fprintf(logWriter, "claw-wall: %v\n", rateLimitErr) + } + continue + } if logWriter != nil { fmt.Fprintf(logWriter, "claw-wall: poll channel %s failed: %v\n", target.ChannelID, err) } @@ -142,7 +160,12 @@ func (p *discordPoller) pollOnce(ctx context.Context, logWriter io.Writer) { } func (p *discordPoller) fetchMessages(ctx context.Context, target tokenPair, afterID string) ([]wallMessage, string, error) { - url := fmt.Sprintf("https://discord.com/api/v10/channels/%s/messages?limit=%d", target.ChannelID, p.fetchLimit) + baseURL := strings.TrimRight(strings.TrimSpace(p.baseURL), "/") + if baseURL == "" { + baseURL = "https://discord.com/api/v10" + } + + url := fmt.Sprintf("%s/channels/%s/messages?limit=%d", baseURL, target.ChannelID, p.fetchLimit) if strings.TrimSpace(afterID) != "" { url += "&after=" + afterID } @@ -160,7 +183,14 @@ func (p *discordPoller) fetchMessages(ctx context.Context, target tokenPair, aft } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) + body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + if limit := parseDiscordRateLimit(resp, body, p.now()); limit != nil { + return nil, "", &discordRateLimitError{ + Target: target, + Limit: *limit, + FirstOccurrence: p.cooldowns.record(target, *limit), + } + } return nil, "", fmt.Errorf("discord returned %s: %s", resp.Status, strings.TrimSpace(string(body))) } diff --git a/cmd/claw-wall/main.go b/cmd/claw-wall/main.go index e96fb2c..b6b9889 100644 --- a/cmd/claw-wall/main.go +++ b/cmd/claw-wall/main.go @@ -97,7 +97,7 @@ func loadConfig() (config, error) { return config{}, fmt.Errorf("claw-wall: CLAW_WALL_LIMIT must be at least 1") } - pollSeconds, err := envInt("CLAW_WALL_POLL_INTERVAL", 15) + pollSeconds, err := envInt("CLAW_WALL_POLL_INTERVAL", 30) if err != nil { return config{}, err } diff --git a/cmd/claw-wall/main_test.go b/cmd/claw-wall/main_test.go index 6941da2..1ab951b 100644 --- a/cmd/claw-wall/main_test.go +++ b/cmd/claw-wall/main_test.go @@ -1,11 +1,13 @@ package main import ( + "context" "io" "net/http" "net/http/httptest" "slices" "strings" + "sync" "testing" "time" ) @@ -100,3 +102,244 @@ func TestChannelContextHandlerReturnsBackgroundContextOnQuietTurn(t *testing.T) t.Fatalf("expected background context on quiet turn, got %q", string(secondBody)) } } + +func TestLoadConfigDefaultsPollIntervalToThirtySeconds(t *testing.T) { + t.Setenv("CLAW_WALL_ADDR", "") + t.Setenv("CLAW_WALL_LIMIT", "") + t.Setenv("CLAW_WALL_POLL_INTERVAL", "") + t.Setenv("CLAW_WALL_TOKENS", "chan-1:token-a") + + cfg, err := loadConfig() + if err != nil { + t.Fatalf("loadConfig: %v", err) + } + if cfg.PollInterval != 30*time.Second { + t.Fatalf("expected 30s poll interval, got %s", cfg.PollInterval) + } +} + +func TestLoadConfigUsesPollIntervalOverride(t *testing.T) { + t.Setenv("CLAW_WALL_ADDR", "") + t.Setenv("CLAW_WALL_LIMIT", "") + t.Setenv("CLAW_WALL_POLL_INTERVAL", "42") + t.Setenv("CLAW_WALL_TOKENS", "chan-1:token-a") + + cfg, err := loadConfig() + if err != nil { + t.Fatalf("loadConfig: %v", err) + } + if cfg.PollInterval != 42*time.Second { + t.Fatalf("expected 42s poll interval, got %s", cfg.PollInterval) + } +} + +func TestDiscordPollerChannelRateLimitSkipsOnlyBlockedPair(t *testing.T) { + store := newConversationStore(50) + current := time.Unix(100, 0) + targets := []tokenPair{ + {ChannelID: "chan-1", Token: "token-a"}, + {ChannelID: "chan-2", Token: "token-a"}, + } + + var ( + mu sync.Mutex + hits = make(map[string]int) + ) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + channelID := strings.Split(strings.Trim(r.URL.Path, "/"), "/")[1] + + mu.Lock() + hits[channelID]++ + hit := hits[channelID] + mu.Unlock() + + if channelID == "chan-1" && hit == 1 { + w.Header().Set("Retry-After", "60") + w.WriteHeader(http.StatusTooManyRequests) + _, _ = io.WriteString(w, `{"message":"rate limited"}`) + return + } + + w.Header().Set("Content-Type", "application/json") + _, _ = io.WriteString(w, `[]`) + })) + defer server.Close() + + poller := newDiscordPoller(server.Client(), store, targets, 50) + poller.baseURL = server.URL + poller.now = func() time.Time { return current } + + var logs strings.Builder + poller.pollOnce(context.Background(), &logs) + if !strings.Contains(logs.String(), "rate limited") { + t.Fatalf("expected rate-limit log, got %q", logs.String()) + } + + logs.Reset() + poller.pollOnce(context.Background(), &logs) + if logs.Len() != 0 { + t.Fatalf("expected no log while pair is cooled down, got %q", logs.String()) + } + + mu.Lock() + defer mu.Unlock() + if hits["chan-1"] != 1 { + t.Fatalf("expected chan-1 to be skipped after cooldown, got %d hits", hits["chan-1"]) + } + if hits["chan-2"] != 2 { + t.Fatalf("expected chan-2 to keep polling, got %d hits", hits["chan-2"]) + } +} + +func TestDiscordPollerTokenRateLimitSkipsAllChannelsForToken(t *testing.T) { + store := newConversationStore(50) + current := time.Unix(100, 0) + targets := []tokenPair{ + {ChannelID: "chan-1", Token: "token-a"}, + {ChannelID: "chan-2", Token: "token-a"}, + } + + var ( + mu sync.Mutex + hits = make(map[string]int) + ) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + channelID := strings.Split(strings.Trim(r.URL.Path, "/"), "/")[1] + + mu.Lock() + hits[channelID]++ + hit := hits[channelID] + mu.Unlock() + + if channelID == "chan-1" && hit == 1 { + w.WriteHeader(http.StatusTooManyRequests) + _, _ = io.WriteString(w, `{"retry_after":60,"global":true}`) + return + } + + w.Header().Set("Content-Type", "application/json") + _, _ = io.WriteString(w, `[]`) + })) + defer server.Close() + + poller := newDiscordPoller(server.Client(), store, targets, 50) + poller.baseURL = server.URL + poller.now = func() time.Time { return current } + + var logs strings.Builder + poller.pollOnce(context.Background(), &logs) + if !strings.Contains(logs.String(), "rate limited") { + t.Fatalf("expected rate-limit log, got %q", logs.String()) + } + + logs.Reset() + poller.pollOnce(context.Background(), &logs) + if logs.Len() != 0 { + t.Fatalf("expected no log while token is cooled down, got %q", logs.String()) + } + + mu.Lock() + defer mu.Unlock() + if hits["chan-1"] != 1 { + t.Fatalf("expected chan-1 to stop after first global 429, got %d hits", hits["chan-1"]) + } + if hits["chan-2"] != 0 { + t.Fatalf("expected chan-2 to be blocked by token cooldown, got %d hits", hits["chan-2"]) + } +} + +func TestDiscordPollerNonRateLimitFailureDoesNotCreateCooldown(t *testing.T) { + store := newConversationStore(50) + current := time.Unix(100, 0) + + var ( + mu sync.Mutex + hits = make(map[string]int) + ) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + channelID := strings.Split(strings.Trim(r.URL.Path, "/"), "/")[1] + + mu.Lock() + hits[channelID]++ + mu.Unlock() + + if channelID == "chan-1" { + http.Error(w, "boom", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + _, _ = io.WriteString(w, `[]`) + })) + defer server.Close() + + poller := newDiscordPoller(server.Client(), store, []tokenPair{ + {ChannelID: "chan-1", Token: "token-a"}, + {ChannelID: "chan-2", Token: "token-a"}, + }, 50) + poller.baseURL = server.URL + poller.now = func() time.Time { return current } + + poller.pollOnce(context.Background(), io.Discard) + poller.pollOnce(context.Background(), io.Discard) + + mu.Lock() + defer mu.Unlock() + if hits["chan-1"] != 2 { + t.Fatalf("expected chan-1 to keep retrying non-429 errors, got %d hits", hits["chan-1"]) + } + if hits["chan-2"] != 2 { + t.Fatalf("expected chan-2 to keep polling, got %d hits", hits["chan-2"]) + } +} + +func TestDiscordPollerResumesPollingAfterCooldownExpires(t *testing.T) { + store := newConversationStore(50) + current := time.Unix(100, 0) + + var ( + mu sync.Mutex + hits = make(map[string]int) + ) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + channelID := strings.Split(strings.Trim(r.URL.Path, "/"), "/")[1] + + mu.Lock() + hits[channelID]++ + hit := hits[channelID] + mu.Unlock() + + if channelID == "chan-1" && hit == 1 { + w.Header().Set("Retry-After", "2") + w.WriteHeader(http.StatusTooManyRequests) + _, _ = io.WriteString(w, `{"message":"rate limited"}`) + return + } + + w.Header().Set("Content-Type", "application/json") + _, _ = io.WriteString(w, `[]`) + })) + defer server.Close() + + poller := newDiscordPoller(server.Client(), store, []tokenPair{ + {ChannelID: "chan-1", Token: "token-a"}, + }, 50) + poller.baseURL = server.URL + poller.now = func() time.Time { return current } + + poller.pollOnce(context.Background(), io.Discard) + poller.pollOnce(context.Background(), io.Discard) + + current = current.Add(3 * time.Second) + poller.pollOnce(context.Background(), io.Discard) + + mu.Lock() + defer mu.Unlock() + if hits["chan-1"] != 2 { + t.Fatalf("expected polling to resume after cooldown expiry, got %d hits", hits["chan-1"]) + } +} diff --git a/cmd/claw-wall/ratelimit.go b/cmd/claw-wall/ratelimit.go new file mode 100644 index 0000000..522c959 --- /dev/null +++ b/cmd/claw-wall/ratelimit.go @@ -0,0 +1,197 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" +) + +const discordRateLimitFallback = 5 * time.Second + +type rateLimitScope string + +const ( + rateLimitScopeChannel rateLimitScope = "channel" + rateLimitScopeToken rateLimitScope = "token" +) + +type rateLimit struct { + Scope rateLimitScope + RetryAfter time.Duration + RecordedAt time.Time +} + +type discordRateLimitError struct { + Target tokenPair + Limit rateLimit + FirstOccurrence bool +} + +func (e *discordRateLimitError) Error() string { + return fmt.Sprintf( + "discord rate limited %s %q; retry after %s", + describeRateLimitTarget(e.Limit.Scope), + e.Target.ChannelID, + e.Limit.RetryAfter.Round(time.Millisecond), + ) +} + +type rateLimitTracker struct { + channelExpiry map[string]time.Time + tokenExpiry map[string]time.Time +} + +type discordRateLimitPayload struct { + RetryAfter float64 `json:"retry_after"` + Global bool `json:"global"` +} + +func newRateLimitTracker() *rateLimitTracker { + return &rateLimitTracker{ + channelExpiry: make(map[string]time.Time), + tokenExpiry: make(map[string]time.Time), + } +} + +func (t *rateLimitTracker) blocked(target tokenPair, now time.Time) bool { + if t == nil { + return false + } + if expiry, ok := t.tokenExpiry[target.Token]; ok { + if expiry.After(now) { + return true + } + delete(t.tokenExpiry, target.Token) + } + if expiry, ok := t.channelExpiry[pairKey(target)]; ok { + if expiry.After(now) { + return true + } + delete(t.channelExpiry, pairKey(target)) + } + return false +} + +func (t *rateLimitTracker) record(target tokenPair, limit rateLimit) bool { + if t == nil { + return false + } + + expiry := limit.RecordedAt.Add(limit.RetryAfter) + switch limit.Scope { + case rateLimitScopeToken: + if !expiry.After(t.tokenExpiry[target.Token]) { + return false + } + t.tokenExpiry[target.Token] = expiry + return true + default: + key := pairKey(target) + if !expiry.After(t.channelExpiry[key]) { + return false + } + t.channelExpiry[key] = expiry + return true + } +} + +func parseDiscordRateLimit(resp *http.Response, body []byte, now time.Time) *rateLimit { + if resp == nil || resp.StatusCode != http.StatusTooManyRequests { + return nil + } + + scope := rateLimitScopeChannel + if strings.EqualFold(strings.TrimSpace(resp.Header.Get("X-RateLimit-Scope")), "global") { + scope = rateLimitScopeToken + } + + durations := make([]time.Duration, 0, 3) + if duration, ok := parseRetryAfterHeader(resp.Header.Get("Retry-After"), now); ok { + durations = append(durations, duration) + } + if duration, ok := parseSecondsDuration(resp.Header.Get("X-RateLimit-Reset-After")); ok { + durations = append(durations, duration) + } + + var payload discordRateLimitPayload + if len(body) > 0 && json.Unmarshal(body, &payload) == nil { + if duration, ok := secondsDuration(payload.RetryAfter); ok { + durations = append(durations, duration) + } + if payload.Global { + scope = rateLimitScopeToken + } + } + + retryAfter := maxDuration(durations...) + if retryAfter <= 0 { + retryAfter = discordRateLimitFallback + } + + return &rateLimit{ + Scope: scope, + RetryAfter: retryAfter, + RecordedAt: now, + } +} + +func parseRetryAfterHeader(raw string, now time.Time) (time.Duration, bool) { + if duration, ok := parseSecondsDuration(raw); ok { + return duration, true + } + + value := strings.TrimSpace(raw) + if value == "" { + return 0, false + } + at, err := http.ParseTime(value) + if err != nil { + return 0, false + } + duration := at.Sub(now) + if duration <= 0 { + return 0, false + } + return duration, true +} + +func parseSecondsDuration(raw string) (time.Duration, bool) { + value := strings.TrimSpace(raw) + if value == "" { + return 0, false + } + seconds, err := strconv.ParseFloat(value, 64) + if err != nil { + return 0, false + } + return secondsDuration(seconds) +} + +func secondsDuration(seconds float64) (time.Duration, bool) { + if seconds <= 0 { + return 0, false + } + return time.Duration(seconds * float64(time.Second)), true +} + +func maxDuration(values ...time.Duration) time.Duration { + var longest time.Duration + for _, value := range values { + if value > longest { + longest = value + } + } + return longest +} + +func describeRateLimitTarget(scope rateLimitScope) string { + switch scope { + case rateLimitScopeToken: + return "token" + default: + return "channel" + } +} diff --git a/cmd/claw-wall/ratelimit_test.go b/cmd/claw-wall/ratelimit_test.go new file mode 100644 index 0000000..aa2ea11 --- /dev/null +++ b/cmd/claw-wall/ratelimit_test.go @@ -0,0 +1,151 @@ +package main + +import ( + "net/http" + "testing" + "time" +) + +func TestParseDiscordRateLimitHeaderOnly(t *testing.T) { + now := time.Unix(100, 0) + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{"Retry-After": []string{"2.5"}}, + } + + limit := parseDiscordRateLimit(resp, nil, now) + if limit == nil { + t.Fatal("expected rate limit") + } + if limit.Scope != rateLimitScopeChannel { + t.Fatalf("expected channel scope, got %q", limit.Scope) + } + if limit.RetryAfter != 2500*time.Millisecond { + t.Fatalf("expected 2.5s retry, got %s", limit.RetryAfter) + } +} + +func TestParseDiscordRateLimitBodyOnly(t *testing.T) { + now := time.Unix(100, 0) + resp := &http.Response{StatusCode: http.StatusTooManyRequests, Header: make(http.Header)} + + limit := parseDiscordRateLimit(resp, []byte(`{"retry_after":1.25}`), now) + if limit == nil { + t.Fatal("expected rate limit") + } + if limit.RetryAfter != 1250*time.Millisecond { + t.Fatalf("expected 1.25s retry, got %s", limit.RetryAfter) + } +} + +func TestParseDiscordRateLimitPrefersLongestRetryHint(t *testing.T) { + now := time.Unix(100, 0) + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: make(http.Header), + } + resp.Header.Set("Retry-After", "1") + resp.Header.Set("X-RateLimit-Reset-After", "2.75") + + limit := parseDiscordRateLimit(resp, []byte(`{"retry_after":2.5}`), now) + if limit == nil { + t.Fatal("expected rate limit") + } + if limit.RetryAfter != 2750*time.Millisecond { + t.Fatalf("expected longest retry hint, got %s", limit.RetryAfter) + } +} + +func TestParseDiscordRateLimitUsesGlobalScopeFromBody(t *testing.T) { + now := time.Unix(100, 0) + resp := &http.Response{StatusCode: http.StatusTooManyRequests, Header: make(http.Header)} + + limit := parseDiscordRateLimit(resp, []byte(`{"retry_after":1,"global":true}`), now) + if limit == nil { + t.Fatal("expected rate limit") + } + if limit.Scope != rateLimitScopeToken { + t.Fatalf("expected token scope, got %q", limit.Scope) + } +} + +func TestParseDiscordRateLimitUsesGlobalScopeFromHeader(t *testing.T) { + now := time.Unix(100, 0) + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: make(http.Header), + } + resp.Header.Set("Retry-After", "3") + resp.Header.Set("X-RateLimit-Scope", "global") + + limit := parseDiscordRateLimit(resp, nil, now) + if limit == nil { + t.Fatal("expected rate limit") + } + if limit.Scope != rateLimitScopeToken { + t.Fatalf("expected token scope, got %q", limit.Scope) + } +} + +func TestParseDiscordRateLimitFallsBackWhenTimingMissing(t *testing.T) { + now := time.Unix(100, 0) + resp := &http.Response{StatusCode: http.StatusTooManyRequests, Header: make(http.Header)} + + limit := parseDiscordRateLimit(resp, []byte(`{"message":"rate limited"}`), now) + if limit == nil { + t.Fatal("expected rate limit") + } + if limit.RetryAfter != discordRateLimitFallback { + t.Fatalf("expected fallback retry, got %s", limit.RetryAfter) + } +} + +func TestParseDiscordRateLimitIgnoresNon429(t *testing.T) { + resp := &http.Response{StatusCode: http.StatusForbidden, Header: make(http.Header)} + if limit := parseDiscordRateLimit(resp, []byte(`{"retry_after":1}`), time.Unix(100, 0)); limit != nil { + t.Fatalf("expected nil rate limit, got %+v", limit) + } +} + +func TestRateLimitTrackerBlocksOnlyMatchingChannel(t *testing.T) { + tracker := newRateLimitTracker() + now := time.Unix(100, 0) + tracker.record(tokenPair{ChannelID: "chan-1", Token: "token-a"}, rateLimit{ + Scope: rateLimitScopeChannel, + RetryAfter: 5 * time.Second, + RecordedAt: now, + }) + + if !tracker.blocked(tokenPair{ChannelID: "chan-1", Token: "token-a"}, now.Add(time.Second)) { + t.Fatal("expected matching channel to be blocked") + } + if tracker.blocked(tokenPair{ChannelID: "chan-2", Token: "token-a"}, now.Add(time.Second)) { + t.Fatal("did not expect other channel on same token to be blocked") + } + if tracker.blocked(tokenPair{ChannelID: "chan-1", Token: "token-b"}, now.Add(time.Second)) { + t.Fatal("did not expect same channel on other token to be blocked") + } + if tracker.blocked(tokenPair{ChannelID: "chan-1", Token: "token-a"}, now.Add(6*time.Second)) { + t.Fatal("expected block to expire") + } +} + +func TestRateLimitTrackerBlocksAllChannelsForToken(t *testing.T) { + tracker := newRateLimitTracker() + now := time.Unix(100, 0) + tracker.record(tokenPair{ChannelID: "chan-1", Token: "token-a"}, rateLimit{ + Scope: rateLimitScopeToken, + RetryAfter: 5 * time.Second, + RecordedAt: now, + }) + + if !tracker.blocked(tokenPair{ChannelID: "chan-1", Token: "token-a"}, now.Add(time.Second)) { + t.Fatal("expected matching token/channel to be blocked") + } + if !tracker.blocked(tokenPair{ChannelID: "chan-2", Token: "token-a"}, now.Add(time.Second)) { + t.Fatal("expected other channel on same token to be blocked") + } + if tracker.blocked(tokenPair{ChannelID: "chan-2", Token: "token-b"}, now.Add(time.Second)) { + t.Fatal("did not expect other token to be blocked") + } +} diff --git a/cmd/claw/compose_up.go b/cmd/claw/compose_up.go index b90109c..b607cfa 100644 --- a/cmd/claw/compose_up.go +++ b/cmd/claw/compose_up.go @@ -42,6 +42,7 @@ const ( conversationWallFeedName = "channel-context" conversationWallFeedTTL = 30 conversationWallFeedLimit = 20 + conversationWallPollInterval = "30" conversationWallInternalPort = "8080" conversationWallDockerfile = "dockerfiles/claw-wall/Dockerfile" clawInternalNetworkName = "claw-internal" @@ -1464,17 +1465,22 @@ type conversationWallTokenPair struct { Token string } +type conversationWallTokenCandidate struct { + ServiceName string + Token string +} + func injectConversationWall(p *pod.Pod, resolvedClaws map[string]*driver.ResolvedClaw) error { if p == nil { return nil } - tokenPairs := make([]conversationWallTokenPair, 0) + consumedChannelSet := make(map[string]struct{}) triggerServices := make(map[string][]string) for _, name := range sortedResolvedClawNames(resolvedClaws) { rc := resolvedClaws[name] - if rc == nil { + if rc == nil || len(rc.Cllama) == 0 { continue } svc := p.Services[name] @@ -1487,19 +1493,8 @@ func injectConversationWall(p *pod.Pod, resolvedClaws map[string]*driver.Resolve continue } - token := strings.TrimSpace(svc.Environment["DISCORD_BOT_TOKEN"]) - if token == "" { - return fmt.Errorf("service %q: HANDLE discord with channel IDs requires DISCORD_BOT_TOKEN for conversation wall injection", name) - } for _, channelID := range channelIDs { - tokenPairs = append(tokenPairs, conversationWallTokenPair{ - ChannelID: channelID, - Token: token, - }) - } - - if len(rc.Cllama) == 0 { - continue + consumedChannelSet[channelID] = struct{}{} } triggerServices[name] = channelIDs } @@ -1510,9 +1505,41 @@ func injectConversationWall(p *pod.Pod, resolvedClaws map[string]*driver.Resolve if _, exists := p.Services[conversationWallServiceName]; exists { return fmt.Errorf("service name %q is reserved for the conversation wall sidecar", conversationWallServiceName) } - if len(tokenPairs) == 0 { - return fmt.Errorf("conversation wall injection triggered but no Discord channel/token pairs were found") + + consumedChannelIDs := sortedConversationWallChannelIDs(consumedChannelSet) + candidatesByChannel := make(map[string][]conversationWallTokenCandidate, len(consumedChannelIDs)) + for _, name := range sortedConversationWallServiceNames(p.Services) { + svc := p.Services[name] + if svc == nil || svc.Claw == nil { + continue + } + token := strings.TrimSpace(svc.Environment["DISCORD_BOT_TOKEN"]) + if token == "" { + continue + } + for _, channelID := range discordHandleChannelIDs(svc.Claw.Handles) { + if _, consumed := consumedChannelSet[channelID]; !consumed { + continue + } + candidatesByChannel[channelID] = append(candidatesByChannel[channelID], conversationWallTokenCandidate{ + ServiceName: name, + Token: token, + }) + } } + + tokenPairs := make([]conversationWallTokenPair, 0, len(consumedChannelIDs)) + for _, channelID := range consumedChannelIDs { + token, err := selectConversationWallToken(channelID, p.Master, candidatesByChannel[channelID]) + if err != nil { + return err + } + tokenPairs = append(tokenPairs, conversationWallTokenPair{ + ChannelID: channelID, + Token: token, + }) + } + for name, channelIDs := range triggerServices { svc := p.Services[name] if svc == nil || svc.Claw == nil { @@ -1522,9 +1549,12 @@ func injectConversationWall(p *pod.Pod, resolvedClaws map[string]*driver.Resolve } p.Services[conversationWallServiceName] = &pod.Service{ - Image: resolveConversationWallImageRef(), - Environment: map[string]string{"CLAW_WALL_TOKENS": formatConversationWallTokenPairs(tokenPairs)}, - Expose: []string{conversationWallInternalPort}, + Image: resolveConversationWallImageRef(), + Environment: map[string]string{ + "CLAW_WALL_TOKENS": formatConversationWallTokenPairs(tokenPairs), + "CLAW_WALL_POLL_INTERVAL": envOrDefault("CLAW_WALL_POLL_INTERVAL", conversationWallPollInterval), + }, + Expose: []string{conversationWallInternalPort}, Compose: map[string]interface{}{ "networks": []string{"claw-internal"}, "restart": "on-failure", @@ -1544,6 +1574,36 @@ func injectConversationWall(p *pod.Pod, resolvedClaws map[string]*driver.Resolve return nil } +func selectConversationWallToken(channelID, master string, candidates []conversationWallTokenCandidate) (string, error) { + if len(candidates) == 0 { + return "", fmt.Errorf("conversation wall injection triggered but channel %q has no eligible Discord reader token", channelID) + } + for _, candidate := range candidates { + if candidate.ServiceName == master { + return candidate.Token, nil + } + } + return candidates[0].Token, nil +} + +func sortedConversationWallChannelIDs(channels map[string]struct{}) []string { + channelIDs := make([]string, 0, len(channels)) + for channelID := range channels { + channelIDs = append(channelIDs, channelID) + } + sort.Strings(channelIDs) + return channelIDs +} + +func sortedConversationWallServiceNames(services map[string]*pod.Service) []string { + names := make([]string, 0, len(services)) + for name := range services { + names = append(names, name) + } + sort.Strings(names) + return names +} + func appendConversationWallFeed(feeds []pod.FeedEntry, channelIDs []string) []pod.FeedEntry { path := fmt.Sprintf("/channel-context?consumer={claw_id}&channels=%s&limit=%d", strings.Join(channelIDs, ","), conversationWallFeedLimit) for _, feed := range feeds { diff --git a/cmd/claw/compose_up_test.go b/cmd/claw/compose_up_test.go index 303182c..086d566 100644 --- a/cmd/claw/compose_up_test.go +++ b/cmd/claw/compose_up_test.go @@ -3096,45 +3096,40 @@ func TestBuildServiceSurfaceInfoOmitsEndpointsWhenToolsDeclared(t *testing.T) { } } +func testConversationWallService(token string, channelIDs ...string) *pod.Service { + environment := make(map[string]string) + if token != "" { + environment["DISCORD_BOT_TOKEN"] = token + } + + channels := make([]driver.ChannelInfo, 0, len(channelIDs)) + for _, channelID := range channelIDs { + channels = append(channels, driver.ChannelInfo{ID: channelID}) + } + + return &pod.Service{ + Environment: environment, + Claw: &pod.ClawBlock{ + Handles: map[string]*driver.HandleInfo{ + "discord": { + Guilds: []driver.GuildInfo{{ + ID: "guild-1", + Channels: channels, + }}, + }, + }, + }, + } +} + func TestInjectConversationWallAddsServiceAndFeed(t *testing.T) { + t.Setenv("CLAW_WALL_POLL_INTERVAL", "") + p := &pod.Pod{ Name: "desk", Services: map[string]*pod.Service{ - "observer": { - Environment: map[string]string{ - "DISCORD_BOT_TOKEN": "${OBSERVER_DISCORD_BOT_TOKEN}", - }, - Claw: &pod.ClawBlock{ - Handles: map[string]*driver.HandleInfo{ - "discord": { - Guilds: []driver.GuildInfo{{ - ID: "guild-1", - Channels: []driver.ChannelInfo{ - {ID: "chan-2"}, - }, - }}, - }, - }, - }, - }, - "trader": { - Environment: map[string]string{ - "DISCORD_BOT_TOKEN": "${TRADER_DISCORD_BOT_TOKEN}", - }, - Claw: &pod.ClawBlock{ - Handles: map[string]*driver.HandleInfo{ - "discord": { - Guilds: []driver.GuildInfo{{ - ID: "guild-1", - Channels: []driver.ChannelInfo{ - {ID: "chan-2"}, - {ID: "chan-1"}, - }, - }}, - }, - }, - }, - }, + "observer": testConversationWallService("${OBSERVER_DISCORD_BOT_TOKEN}", "chan-2", "chan-9"), + "trader": testConversationWallService("${TRADER_DISCORD_BOT_TOKEN}", "chan-2", "chan-1"), }, } @@ -3157,9 +3152,15 @@ func TestInjectConversationWallAddsServiceAndFeed(t *testing.T) { if !slices.Equal(wall.Expose, []string{conversationWallInternalPort}) { t.Fatalf("expected claw-wall expose %v, got %v", []string{conversationWallInternalPort}, wall.Expose) } - if wall.Environment["CLAW_WALL_TOKENS"] != "chan-1:${TRADER_DISCORD_BOT_TOKEN},chan-2:${OBSERVER_DISCORD_BOT_TOKEN},chan-2:${TRADER_DISCORD_BOT_TOKEN}" { + if wall.Environment["CLAW_WALL_TOKENS"] != "chan-1:${TRADER_DISCORD_BOT_TOKEN},chan-2:${OBSERVER_DISCORD_BOT_TOKEN}" { t.Fatalf("unexpected CLAW_WALL_TOKENS: %q", wall.Environment["CLAW_WALL_TOKENS"]) } + if strings.Contains(wall.Environment["CLAW_WALL_TOKENS"], "chan-9") { + t.Fatalf("expected unconsumed channel to be excluded, got %q", wall.Environment["CLAW_WALL_TOKENS"]) + } + if wall.Environment["CLAW_WALL_POLL_INTERVAL"] != conversationWallPollInterval { + t.Fatalf("unexpected CLAW_WALL_POLL_INTERVAL: %q", wall.Environment["CLAW_WALL_POLL_INTERVAL"]) + } traderFeeds := p.Services["trader"].Claw.Feeds if len(traderFeeds) != 1 { @@ -3176,20 +3177,92 @@ func TestInjectConversationWallAddsServiceAndFeed(t *testing.T) { } } +func TestInjectConversationWallPrefersMasterTokenWhenMasterDeclaresChannel(t *testing.T) { + p := &pod.Pod{ + Name: "desk", + Master: "zmaster", + Services: map[string]*pod.Service{ + "observer": testConversationWallService("${OBSERVER_DISCORD_BOT_TOKEN}", "chan-2"), + "trader": testConversationWallService("${TRADER_DISCORD_BOT_TOKEN}", "chan-2"), + "zmaster": testConversationWallService("${MASTER_DISCORD_BOT_TOKEN}", "chan-2"), + }, + } + + resolvedClaws := map[string]*driver.ResolvedClaw{ + "observer": {ServiceName: "observer"}, + "trader": {ServiceName: "trader", Cllama: []string{"passthrough"}}, + "zmaster": {ServiceName: "zmaster"}, + } + + if err := injectConversationWall(p, resolvedClaws); err != nil { + t.Fatalf("injectConversationWall: %v", err) + } + + wall := p.Services[conversationWallServiceName] + if wall == nil { + t.Fatal("expected claw-wall service to be injected") + } + if wall.Environment["CLAW_WALL_TOKENS"] != "chan-2:${MASTER_DISCORD_BOT_TOKEN}" { + t.Fatalf("unexpected CLAW_WALL_TOKENS: %q", wall.Environment["CLAW_WALL_TOKENS"]) + } +} + +func TestInjectConversationWallFallsBackWhenMasterDoesNotDeclareChannel(t *testing.T) { + p := &pod.Pod{ + Name: "desk", + Master: "zmaster", + Services: map[string]*pod.Service{ + "observer": testConversationWallService("${OBSERVER_DISCORD_BOT_TOKEN}", "chan-2"), + "trader": testConversationWallService("", "chan-2"), + "zmaster": testConversationWallService("${MASTER_DISCORD_BOT_TOKEN}", "chan-9"), + }, + } + + resolvedClaws := map[string]*driver.ResolvedClaw{ + "observer": {ServiceName: "observer"}, + "trader": {ServiceName: "trader", Cllama: []string{"passthrough"}}, + "zmaster": {ServiceName: "zmaster"}, + } + + if err := injectConversationWall(p, resolvedClaws); err != nil { + t.Fatalf("injectConversationWall: %v", err) + } + + wall := p.Services[conversationWallServiceName] + if wall == nil { + t.Fatal("expected claw-wall service to be injected") + } + if wall.Environment["CLAW_WALL_TOKENS"] != "chan-2:${OBSERVER_DISCORD_BOT_TOKEN}" { + t.Fatalf("unexpected CLAW_WALL_TOKENS: %q", wall.Environment["CLAW_WALL_TOKENS"]) + } +} + +func TestInjectConversationWallRejectsConsumedChannelWithoutEligibleReader(t *testing.T) { + p := &pod.Pod{ + Name: "desk", + Services: map[string]*pod.Service{ + "observer": testConversationWallService("${OBSERVER_DISCORD_BOT_TOKEN}", "chan-9"), + "trader": testConversationWallService("", "chan-2"), + }, + } + + err := injectConversationWall(p, map[string]*driver.ResolvedClaw{ + "observer": {ServiceName: "observer"}, + "trader": {ServiceName: "trader", Cllama: []string{"passthrough"}}, + }) + if err == nil { + t.Fatal("expected missing-reader error") + } + if !strings.Contains(err.Error(), "chan-2") { + t.Fatalf("expected channel ID in error, got %v", err) + } +} + func TestInjectConversationWallRejectsReservedServiceName(t *testing.T) { p := &pod.Pod{ Services: map[string]*pod.Service{ conversationWallServiceName: {Image: "busybox"}, - "trader": { - Environment: map[string]string{"DISCORD_BOT_TOKEN": "token"}, - Claw: &pod.ClawBlock{ - Handles: map[string]*driver.HandleInfo{ - "discord": { - Guilds: []driver.GuildInfo{{Channels: []driver.ChannelInfo{{ID: "chan-1"}}}}, - }, - }, - }, - }, + "trader": testConversationWallService("token", "chan-1"), }, } diff --git a/docs/plans/2026-04-12-claw-wall-rate-limit-fix.md b/docs/plans/2026-04-12-claw-wall-rate-limit-fix.md new file mode 100644 index 0000000..4f42b13 --- /dev/null +++ b/docs/plans/2026-04-12-claw-wall-rate-limit-fix.md @@ -0,0 +1,207 @@ +# claw-wall Discord Rate Limit Fix — Implementation Plan + +**Issue:** mostlydev/clawdapus#147 + +## Goal + +Prevent `claw-wall` from triggering Discord global rate limits by: + +1. Polling each consumed channel with exactly one reader token. +2. Honoring Discord `429` backoff instead of retrying on the next tick. +3. Reducing steady-state poll pressure with an explicit wall poll interval. + +## Design Constraints + +- Fix the current compile-time bug in `cmd/claw/compose_up.go` where wall readers are derived from all Discord-handled services before the `cllama` check. +- Keep runtime retry behavior narrow: this change is about `429` handling, not generic retry middleware. +- Preserve deterministic output in `compose.generated.yml`. +- Keep operator behavior obvious: + - `claw up` should fail if a consumed channel has no eligible reader token. + - non-`429` Discord failures should stay visible and should not create cooldown state. + +## Non-Goals + +- Do not add generic backoff for `401`, `403`, `5xx`, or network errors. +- Do not introduce multiple wall reader tokens per channel as a fallback. +- Do not invent new CLI flags for this work. + +## Target Behavior + +### Compile-Time + +- `claw-wall` is injected only when at least one `cllama` service consumes Discord channel context. +- The wall polls the union of Discord channel IDs declared by `cllama` services only. +- For each consumed channel, `claw up` picks exactly one reader token. +- Reader selection order: + 1. Master service token, if `p.Master` is set and the master actually declares that channel. + 2. Otherwise the lexicographically first eligible service name that declares the channel and has `DISCORD_BOT_TOKEN`. +- A non-`cllama` service may serve as the reader for a consumed channel, but it must not add extra channels that no `cllama` service consumes. +- If a consumed channel has no eligible reader token, `claw up` fails with the channel ID in the error. +- The injected wall service sets both: + - `CLAW_WALL_TOKENS` + - `CLAW_WALL_POLL_INTERVAL` +- Default wall poll interval becomes `30` seconds unless the host environment already sets `CLAW_WALL_POLL_INTERVAL`. + +### Runtime + +- A local `429` blocks only that token/channel pair until the backoff expires. +- A global or token-wide `429` blocks all channels using that token until the backoff expires. +- `pollOnce()` skips cooled-down targets without calling Discord again. +- Cooldown creation is logged once when the `429` is recorded. +- Skipped polls during the cooldown window are silent. +- Non-`429` failures are still logged and retried on the next normal tick. + +## Implementation Outline + +## Task 1: Rewrite compile-time channel and token selection + +**Files** + +- `cmd/claw/compose_up.go` +- `cmd/claw/compose_up_test.go` + +### Required structure in `injectConversationWall()` + +1. Pass 1: build `consumedChannels` from resolved claws with `len(rc.Cllama) > 0`. +2. In the same pass, build `triggerServices` for feed injection. +3. Pass 2: scan all services for eligible reader candidates: + - service declares the consumed channel + - service has `DISCORD_BOT_TOKEN` +4. Select one reader token per consumed channel using the precedence rules above. +5. Inject `claw-wall` with one `channel:token` pair per consumed channel. +6. Inject `CLAW_WALL_POLL_INTERVAL`, using host env override first and otherwise `"30"`. + +### Test requirements + +Replace the old expectation that allowed duplicate `chan-2` entries. The test set should cover: + +- Consumer-only channel selection: + - a non-`cllama` service shares one consumed channel and also has an extra unconsumed channel + - only consumed channels appear in `CLAW_WALL_TOKENS` +- One reader token per channel. +- Master preference when the master declares the shared channel. +- Correct fallback when the master exists but does not declare a consumed channel. +- Hard failure when a consumed channel has no eligible reader. +- Existing reserved-name rejection still passes. +- `CLAW_WALL_POLL_INTERVAL` is injected. + +### Notes + +- Keep the output deterministic by sorting consumed channel IDs and candidate service names. +- Do not require `DISCORD_BOT_TOKEN` on services that are not needed as readers. +- Feed injection remains only for the `cllama` services that triggered wall injection. + +## Task 2: Add typed 429 parsing and cooldown tracking + +**Files** + +- `cmd/claw-wall/ratelimit.go` +- `cmd/claw-wall/ratelimit_test.go` + +### Required runtime types + +- A typed `rateLimit` value with: + - scope + - retry duration + - recorded-at time +- A typed error for `429` responses so `pollOnce()` can distinguish rate limits from ordinary failures without string matching. +- A tracker with two scopes: + - channel scope keyed by `token + channel` + - token/global scope keyed by `token` + +### Parsing rules + +When Discord returns `429`, parse timing from all available sources: + +1. `Retry-After` +2. `X-RateLimit-Reset-After` +3. JSON body `retry_after` + +Use the largest valid duration found. If Discord omits all timing hints, use a conservative `5s` fallback. + +Determine scope from: + +- body `global: true`, or +- `X-RateLimit-Scope: global` + +Everything else is treated as local channel scope. + +### Test requirements + +- header-only `429` +- body-only `429` +- `X-RateLimit-Reset-After` +- global scope from body +- global scope from header +- fallback duration when timing data is missing +- non-`429` returns no rate-limit value +- channel-scoped block affects only that channel +- token/global block affects all channels for that token + +## Task 3: Wire cooldown tracking into the poller + +**Files** + +- `cmd/claw-wall/discord.go` +- `cmd/claw-wall/main_test.go` + +### Required changes + +- Extend `discordPoller` with: + - cooldown tracker + - optional base URL override for tests +- In `pollOnce()`: + - skip blocked targets before making a request + - log non-`429` failures normally + - log the typed rate-limit error when the cooldown is first recorded +- In `fetchMessages()`: + - detect `429` + - parse it into typed rate-limit data + - record the cooldown + - return a typed rate-limit error + +### Test requirements + +- A `429` on one channel blocks the next poll for that same channel/token pair. +- A global/token-scoped `429` on one channel blocks all channels using that token. +- A non-`429` failure does not create cooldown state. +- The second `pollOnce()` after a `429` does not hit the test server again while the cooldown is active. + +## Task 4: Verification + +Run verification in this order: + +```bash +go test ./cmd/claw-wall/... -count=1 +go test ./cmd/claw/... -count=1 +go test ./... -count=1 +go vet ./... +``` + +Then inspect generated output on a representative pod: + +1. Run a real `claw up -d` against a pod with Discord channel context. +2. Inspect `compose.generated.yml`. +3. Verify: + - `CLAW_WALL_TOKENS` has one entry per consumed channel + - unconsumed channels are absent + - `CLAW_WALL_POLL_INTERVAL` is present + +`claw up --dry-run` is not part of the current CLI surface, so do not rely on it in this work. + +## Suggested Commit Shape + +1. `test(claw): rewrite conversation wall expectations around consumer-only channels` +2. `feat(claw): select one wall reader token per consumed channel` +3. `test(claw-wall): add rate limit parser and cooldown coverage` +4. `feat(claw-wall): honor Discord 429 cooldowns` + +## Summary + +| Area | File(s) | Outcome | +|------|---------|---------| +| Compile-time selection | `cmd/claw/compose_up.go` | One reader token per consumed channel, master-aware, deterministic | +| Compose tests | `cmd/claw/compose_up_test.go` | Old duplicate-token expectation removed, new reader selection cases covered | +| Rate-limit model | `cmd/claw-wall/ratelimit.go` | Typed cooldown parsing and tracking | +| Poller runtime | `cmd/claw-wall/discord.go` | `429` backoff honored without noisy re-polling | +| Verification | tests + `compose.generated.yml` inspection | Confirms behavior against current repo surface |