Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ Do not assume older docs mentioning only a subset are current.
- `claw-api: self` on a service's `x-claw` block is an authority signal — it auto-generates a read-only bearer token scoped to that service and injects `CLAW_API_URL` + `CLAW_API_TOKEN`. This is separate from `surfaces: [service://claw-api]` which only grants network reachability. Both are needed for full access.
- claw-api principal scopes have four dimensions: `pods`, `services` (base service names), `claw_ids` (ordinal IDs), `compose_services` (compose service names). `compose_services` is reserved for write-plane ordinal targeting (#78). Write verb constants (`fleet.restart` etc.) are defined but handlers are not yet implemented.
- `sequential-conformance: true` at pod level allows services to share the same Discord handle ID (rollcall pattern). Without it, duplicate handle IDs across services are a hard error. The `count > 1` rejection is still enforced even in sequential-conformance pods.
- `claw-wall` is an infrastructure service auto-injected by `claw up` when any cllama-enabled service has Discord channel IDs. The service name `claw-wall` is reserved — declaring it in `claw-pod.yml` is a hard error. Credentials are passed as `CLAW_WALL_TOKENS=channelID:token,...` pairs (not a map); the same channel ID can appear with different tokens for multi-bot pods. Cursor state is in-memory and resets on wall restart — agents may see some message overlap after redeploy.
- `claw-wall` is an infrastructure service auto-injected by `claw up` when any cllama-enabled service has Discord channel IDs. The service name `claw-wall` is reserved — declaring it in `claw-pod.yml` is a hard error. Credentials are passed as `CLAW_WALL_TOKENS=channelID:token,...` pairs (not a map), with exactly one reader token selected per consumed channel. Cursor state is in-memory and resets on wall restart — agents may see some message overlap after redeploy.
- Verb validation happens at parse time — unknown verbs in `x-claw.principals` or `principals.json` fail hard.
- `claw.describe` is the structured service descriptor label. `claw up` extracts `.claw-describe.json` from images (or falls back to the build context filesystem). Descriptors declare feeds, endpoints, auth, and skill file paths. Feed names from descriptors populate a pod-global feed registry; consumers subscribe by name via short-form `feeds: [name]`.
- Feed resolution is two-phase: the parser stores short-form feed names as `FeedEntry{Unresolved: true}` (no source/path validation). `claw up` resolves them after image inspection against the feed registry. Unresolved feeds that aren't in the registry are hard errors.
Expand Down
34 changes: 32 additions & 2 deletions cmd/claw-wall/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -22,6 +23,9 @@ type discordPoller struct {
targets []tokenPair
fetchLimit int
latestByPair map[string]string
baseURL string
cooldowns *rateLimitTracker
now func() time.Time
}

type discordAPIMessage struct {
Expand Down Expand Up @@ -104,6 +108,9 @@ func newDiscordPoller(client *http.Client, store *conversationStore, targets []t
targets: targets,
fetchLimit: fetchLimit,
latestByPair: make(map[string]string),
baseURL: "https://discord.com/api/v10",
cooldowns: newRateLimitTracker(),
now: time.Now,
}
}

Expand All @@ -124,9 +131,20 @@ func (p *discordPoller) Run(ctx context.Context, interval time.Duration, logWrit

func (p *discordPoller) pollOnce(ctx context.Context, logWriter io.Writer) {
for _, target := range p.targets {
if p.cooldowns.blocked(target, p.now()) {
continue
}

latestID := p.latestByPair[pairKey(target)]
messages, newestID, err := p.fetchMessages(ctx, target, latestID)
if err != nil {
var rateLimitErr *discordRateLimitError
if errors.As(err, &rateLimitErr) {
if logWriter != nil && rateLimitErr.FirstOccurrence {
fmt.Fprintf(logWriter, "claw-wall: %v\n", rateLimitErr)
}
continue
}
if logWriter != nil {
fmt.Fprintf(logWriter, "claw-wall: poll channel %s failed: %v\n", target.ChannelID, err)
}
Expand All @@ -142,7 +160,12 @@ func (p *discordPoller) pollOnce(ctx context.Context, logWriter io.Writer) {
}

func (p *discordPoller) fetchMessages(ctx context.Context, target tokenPair, afterID string) ([]wallMessage, string, error) {
url := fmt.Sprintf("https://discord.com/api/v10/channels/%s/messages?limit=%d", target.ChannelID, p.fetchLimit)
baseURL := strings.TrimRight(strings.TrimSpace(p.baseURL), "/")
if baseURL == "" {
baseURL = "https://discord.com/api/v10"
}

url := fmt.Sprintf("%s/channels/%s/messages?limit=%d", baseURL, target.ChannelID, p.fetchLimit)
if strings.TrimSpace(afterID) != "" {
url += "&after=" + afterID
}
Expand All @@ -160,7 +183,14 @@ func (p *discordPoller) fetchMessages(ctx context.Context, target tokenPair, aft
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
if limit := parseDiscordRateLimit(resp, body, p.now()); limit != nil {
return nil, "", &discordRateLimitError{
Target: target,
Limit: *limit,
FirstOccurrence: p.cooldowns.record(target, *limit),
}
}
return nil, "", fmt.Errorf("discord returned %s: %s", resp.Status, strings.TrimSpace(string(body)))
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/claw-wall/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func loadConfig() (config, error) {
return config{}, fmt.Errorf("claw-wall: CLAW_WALL_LIMIT must be at least 1")
}

pollSeconds, err := envInt("CLAW_WALL_POLL_INTERVAL", 15)
pollSeconds, err := envInt("CLAW_WALL_POLL_INTERVAL", 30)
if err != nil {
return config{}, err
}
Expand Down
243 changes: 243 additions & 0 deletions cmd/claw-wall/main_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package main

import (
"context"
"io"
"net/http"
"net/http/httptest"
"slices"
"strings"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -100,3 +102,244 @@ func TestChannelContextHandlerReturnsBackgroundContextOnQuietTurn(t *testing.T)
t.Fatalf("expected background context on quiet turn, got %q", string(secondBody))
}
}

func TestLoadConfigDefaultsPollIntervalToThirtySeconds(t *testing.T) {
t.Setenv("CLAW_WALL_ADDR", "")
t.Setenv("CLAW_WALL_LIMIT", "")
t.Setenv("CLAW_WALL_POLL_INTERVAL", "")
t.Setenv("CLAW_WALL_TOKENS", "chan-1:token-a")

cfg, err := loadConfig()
if err != nil {
t.Fatalf("loadConfig: %v", err)
}
if cfg.PollInterval != 30*time.Second {
t.Fatalf("expected 30s poll interval, got %s", cfg.PollInterval)
}
}

func TestLoadConfigUsesPollIntervalOverride(t *testing.T) {
t.Setenv("CLAW_WALL_ADDR", "")
t.Setenv("CLAW_WALL_LIMIT", "")
t.Setenv("CLAW_WALL_POLL_INTERVAL", "42")
t.Setenv("CLAW_WALL_TOKENS", "chan-1:token-a")

cfg, err := loadConfig()
if err != nil {
t.Fatalf("loadConfig: %v", err)
}
if cfg.PollInterval != 42*time.Second {
t.Fatalf("expected 42s poll interval, got %s", cfg.PollInterval)
}
}

func TestDiscordPollerChannelRateLimitSkipsOnlyBlockedPair(t *testing.T) {
store := newConversationStore(50)
current := time.Unix(100, 0)
targets := []tokenPair{
{ChannelID: "chan-1", Token: "token-a"},
{ChannelID: "chan-2", Token: "token-a"},
}

var (
mu sync.Mutex
hits = make(map[string]int)
)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
channelID := strings.Split(strings.Trim(r.URL.Path, "/"), "/")[1]

mu.Lock()
hits[channelID]++
hit := hits[channelID]
mu.Unlock()

if channelID == "chan-1" && hit == 1 {
w.Header().Set("Retry-After", "60")
w.WriteHeader(http.StatusTooManyRequests)
_, _ = io.WriteString(w, `{"message":"rate limited"}`)
return
}

w.Header().Set("Content-Type", "application/json")
_, _ = io.WriteString(w, `[]`)
}))
defer server.Close()

poller := newDiscordPoller(server.Client(), store, targets, 50)
poller.baseURL = server.URL
poller.now = func() time.Time { return current }

var logs strings.Builder
poller.pollOnce(context.Background(), &logs)
if !strings.Contains(logs.String(), "rate limited") {
t.Fatalf("expected rate-limit log, got %q", logs.String())
}

logs.Reset()
poller.pollOnce(context.Background(), &logs)
if logs.Len() != 0 {
t.Fatalf("expected no log while pair is cooled down, got %q", logs.String())
}

mu.Lock()
defer mu.Unlock()
if hits["chan-1"] != 1 {
t.Fatalf("expected chan-1 to be skipped after cooldown, got %d hits", hits["chan-1"])
}
if hits["chan-2"] != 2 {
t.Fatalf("expected chan-2 to keep polling, got %d hits", hits["chan-2"])
}
}

func TestDiscordPollerTokenRateLimitSkipsAllChannelsForToken(t *testing.T) {
store := newConversationStore(50)
current := time.Unix(100, 0)
targets := []tokenPair{
{ChannelID: "chan-1", Token: "token-a"},
{ChannelID: "chan-2", Token: "token-a"},
}

var (
mu sync.Mutex
hits = make(map[string]int)
)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
channelID := strings.Split(strings.Trim(r.URL.Path, "/"), "/")[1]

mu.Lock()
hits[channelID]++
hit := hits[channelID]
mu.Unlock()

if channelID == "chan-1" && hit == 1 {
w.WriteHeader(http.StatusTooManyRequests)
_, _ = io.WriteString(w, `{"retry_after":60,"global":true}`)
return
}

w.Header().Set("Content-Type", "application/json")
_, _ = io.WriteString(w, `[]`)
}))
defer server.Close()

poller := newDiscordPoller(server.Client(), store, targets, 50)
poller.baseURL = server.URL
poller.now = func() time.Time { return current }

var logs strings.Builder
poller.pollOnce(context.Background(), &logs)
if !strings.Contains(logs.String(), "rate limited") {
t.Fatalf("expected rate-limit log, got %q", logs.String())
}

logs.Reset()
poller.pollOnce(context.Background(), &logs)
if logs.Len() != 0 {
t.Fatalf("expected no log while token is cooled down, got %q", logs.String())
}

mu.Lock()
defer mu.Unlock()
if hits["chan-1"] != 1 {
t.Fatalf("expected chan-1 to stop after first global 429, got %d hits", hits["chan-1"])
}
if hits["chan-2"] != 0 {
t.Fatalf("expected chan-2 to be blocked by token cooldown, got %d hits", hits["chan-2"])
}
}

func TestDiscordPollerNonRateLimitFailureDoesNotCreateCooldown(t *testing.T) {
store := newConversationStore(50)
current := time.Unix(100, 0)

var (
mu sync.Mutex
hits = make(map[string]int)
)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
channelID := strings.Split(strings.Trim(r.URL.Path, "/"), "/")[1]

mu.Lock()
hits[channelID]++
mu.Unlock()

if channelID == "chan-1" {
http.Error(w, "boom", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
_, _ = io.WriteString(w, `[]`)
}))
defer server.Close()

poller := newDiscordPoller(server.Client(), store, []tokenPair{
{ChannelID: "chan-1", Token: "token-a"},
{ChannelID: "chan-2", Token: "token-a"},
}, 50)
poller.baseURL = server.URL
poller.now = func() time.Time { return current }

poller.pollOnce(context.Background(), io.Discard)
poller.pollOnce(context.Background(), io.Discard)

mu.Lock()
defer mu.Unlock()
if hits["chan-1"] != 2 {
t.Fatalf("expected chan-1 to keep retrying non-429 errors, got %d hits", hits["chan-1"])
}
if hits["chan-2"] != 2 {
t.Fatalf("expected chan-2 to keep polling, got %d hits", hits["chan-2"])
}
}

func TestDiscordPollerResumesPollingAfterCooldownExpires(t *testing.T) {
store := newConversationStore(50)
current := time.Unix(100, 0)

var (
mu sync.Mutex
hits = make(map[string]int)
)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
channelID := strings.Split(strings.Trim(r.URL.Path, "/"), "/")[1]

mu.Lock()
hits[channelID]++
hit := hits[channelID]
mu.Unlock()

if channelID == "chan-1" && hit == 1 {
w.Header().Set("Retry-After", "2")
w.WriteHeader(http.StatusTooManyRequests)
_, _ = io.WriteString(w, `{"message":"rate limited"}`)
return
}

w.Header().Set("Content-Type", "application/json")
_, _ = io.WriteString(w, `[]`)
}))
defer server.Close()

poller := newDiscordPoller(server.Client(), store, []tokenPair{
{ChannelID: "chan-1", Token: "token-a"},
}, 50)
poller.baseURL = server.URL
poller.now = func() time.Time { return current }

poller.pollOnce(context.Background(), io.Discard)
poller.pollOnce(context.Background(), io.Discard)

current = current.Add(3 * time.Second)
poller.pollOnce(context.Background(), io.Discard)

mu.Lock()
defer mu.Unlock()
if hits["chan-1"] != 2 {
t.Fatalf("expected polling to resume after cooldown expiry, got %d hits", hits["chan-1"])
}
}
Loading
Loading