From 7b963fd2f67a7143b51aa3e239d0a66925bd4951 Mon Sep 17 00:00:00 2001 From: St0rmz1 Date: Mon, 30 Mar 2026 09:03:19 -0700 Subject: [PATCH 1/2] feat(kiloclaw): add server-side Stream Chat message sending --- kiloclaw/src/routes/platform.ts | 49 ++++++++++++ kiloclaw/src/stream-chat/client.test.ts | 84 ++++++++++++++++++++ kiloclaw/src/stream-chat/client.ts | 43 ++++++++++ src/lib/kiloclaw/kiloclaw-internal-client.ts | 15 ++++ src/routers/kiloclaw-router.ts | 42 ++++++++++ 5 files changed, 233 insertions(+) diff --git a/kiloclaw/src/routes/platform.ts b/kiloclaw/src/routes/platform.ts index 9dd1a62f96..59c3b452b3 100644 --- a/kiloclaw/src/routes/platform.ts +++ b/kiloclaw/src/routes/platform.ts @@ -30,6 +30,7 @@ import { deriveGatewayToken } from '../auth/gateway-token'; import { sandboxIdFromUserId } from '../auth/sandbox-id'; import { writeEvent } from '../utils/analytics'; import { deriveHttpEventName } from '../middleware/analytics'; +import { sendMessage } from '../stream-chat/client'; const GmailHistoryIdSchema = z.object({ userId: z.string().min(1), @@ -184,6 +185,8 @@ const SAFE_ERROR_PREFIXES = [ 'Cannot restore: ', // snapshot restore: bad state 'Cannot destroy: ', // destroy while restoring 'Cannot retry recovery', // force-retry-recovery guard messages + 'Stream Chat sendMessage failed', // sendMessage HTTP errors + 'Stream Chat is not set up', // no Stream Chat on this instance ]; function sanitizeError(err: unknown, operation: string): { message: string; status: number } { @@ -1194,6 +1197,52 @@ platform.get('/stream-chat-credentials', async c => { } }); +// POST /api/platform/send-chat-message +// Send a message to a KiloClaw instance's Stream Chat channel as the human user. +// The OpenClaw bot picks it up and responds as if the user typed it. +const SendChatMessageSchema = z.object({ + userId: z.string().min(1), + instanceId: z.string().uuid().optional(), + message: z.string().min(1).max(32_000), +}); + +platform.post('/send-chat-message', async c => { + const body: unknown = await c.req.json().catch(() => null); + const parsed = SendChatMessageSchema.safeParse(body); + if (!parsed.success) { + return jsonError('Invalid request body: userId and message are required', 400); + } + + const { userId, instanceId, message } = parsed.data; + c.set('userId', userId); + + const apiKey = c.env.STREAM_CHAT_API_KEY; + const apiSecret = c.env.STREAM_CHAT_API_SECRET; + if (!apiKey || !apiSecret) { + return jsonError('Stream Chat is not configured', 503); + } + + try { + // Get the sandboxId and channel info from the DO + const creds = await withDORetry( + instanceStubFactory(c.env, userId, instanceId), + stub => stub.getStreamChatCredentials(), + 'getStreamChatCredentials' + ); + + if (!creds) { + return jsonError('Stream Chat is not set up for this instance', 404); + } + + await sendMessage(apiKey, apiSecret, creds.channelId, creds.userId, message); + + return c.json({ success: true, channelId: creds.channelId }); + } catch (err) { + const { message: errMsg, status } = sanitizeError(err, 'send-chat-message'); + return jsonError(errMsg, status); + } +}); + // GET /api/platform/debug-status?userId=...&instanceId=... // Internal/admin-only debug status that includes DO destroy internals. platform.get('/debug-status', async c => { diff --git a/kiloclaw/src/stream-chat/client.test.ts b/kiloclaw/src/stream-chat/client.test.ts index a02e8d792a..002565dbf7 100644 --- a/kiloclaw/src/stream-chat/client.test.ts +++ b/kiloclaw/src/stream-chat/client.test.ts @@ -8,6 +8,7 @@ import { deactivateStreamChatUsers, reactivateStreamChatUsers, setupDefaultStreamChatChannel, + sendMessage, } from './client'; // Decode a JWT payload without verifying signature (for test assertions only). @@ -375,3 +376,86 @@ describe('setupDefaultStreamChatChannel', () => { expect(result.botUserId).toBe('bot-sandbox-new'); }); }); + +describe('sendMessage', () => { + const mockFetch = vi.fn(); + + beforeEach(() => { + vi.stubGlobal('fetch', mockFetch); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + mockFetch.mockReset(); + }); + + it('POSTs to /channels/messaging/{channelId}/message with correct payload', async () => { + mockFetch.mockResolvedValueOnce({ ok: true, status: 201 }); + + await sendMessage( + 'my-api-key', + 'my-api-secret', + 'default-sandbox-abc', + 'sandbox-abc', + 'Hello bot!' + ); + + expect(mockFetch).toHaveBeenCalledOnce(); + const [url, opts] = mockFetch.mock.calls[0] as [ + string, + RequestInit & { headers: Record }, + ]; + expect(url).toBe( + 'https://chat.stream-io-api.com/channels/messaging/default-sandbox-abc/message?api_key=my-api-key' + ); + expect(opts.method).toBe('POST'); + expect(opts.headers['Content-Type']).toBe('application/json'); + expect(opts.headers['Stream-Auth-Type']).toBe('jwt'); + // Authorization header should be a server JWT + expect(opts.headers['Authorization']).toBeDefined(); + expect(opts.headers['Authorization'].split('.')).toHaveLength(3); + + const body = JSON.parse(opts.body as string) as { message: { text: string; user_id: string } }; + expect(body.message.text).toBe('Hello bot!'); + expect(body.message.user_id).toBe('sandbox-abc'); + }); + + it('uses a server token (server: true) for authentication', async () => { + mockFetch.mockResolvedValueOnce({ ok: true, status: 201 }); + + await sendMessage('key', 'secret', 'chan-1', 'user-1', 'test'); + + const [, opts] = mockFetch.mock.calls[0] as [ + string, + RequestInit & { headers: Record }, + ]; + const payload = decodeJwtPayload(opts.headers['Authorization']); + expect(payload.server).toBe(true); + }); + + it('throws on HTTP error with status and body in the message', async () => { + mockFetch.mockResolvedValueOnce({ + ok: false, + status: 403, + text: async () => 'User is deactivated', + }); + + await expect(sendMessage('key', 'secret', 'chan-1', 'user-1', 'test')).rejects.toThrow( + 'Stream Chat sendMessage failed (403): User is deactivated' + ); + }); + + it('handles unreadable error body gracefully', async () => { + mockFetch.mockResolvedValueOnce({ + ok: false, + status: 500, + text: async () => { + throw new Error('body read error'); + }, + }); + + await expect(sendMessage('key', 'secret', 'chan-1', 'user-1', 'test')).rejects.toThrow( + 'Stream Chat sendMessage failed (500): (unreadable)' + ); + }); +}); diff --git a/kiloclaw/src/stream-chat/client.ts b/kiloclaw/src/stream-chat/client.ts index b62989abf2..b267799e35 100644 --- a/kiloclaw/src/stream-chat/client.ts +++ b/kiloclaw/src/stream-chat/client.ts @@ -252,3 +252,46 @@ export async function setupDefaultStreamChatChannel( return { apiKey, botUserId, botUserToken, channelId }; } + +/** + * Send a message to a Stream Chat channel on behalf of a user. + * + * Used to programmatically inject messages into a KiloClaw instance's chat + * channel. The message appears as if the user typed it, so the OpenClaw bot + * (listening via the openclaw-channel-streamchat plugin) processes and responds. + * + * @param apiKey Stream Chat API key + * @param apiSecret Stream Chat API secret (used to mint a server JWT) + * @param channelId Target channel ID, e.g. `default-{sandboxId}` + * @param userId The user ID to send the message as (typically the sandboxId) + * @param text Plain-text message content + */ +export async function sendMessage( + apiKey: string, + apiSecret: string, + channelId: string, + userId: string, + text: string +): Promise { + const serverToken = await createServerToken(apiSecret); + + const res = await fetch( + `${STREAM_CHAT_API_BASE}/channels/messaging/${channelId}/message?api_key=${apiKey}`, + { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Stream-Auth-Type': 'jwt', + Authorization: serverToken, + }, + body: JSON.stringify({ + message: { text, user_id: userId }, + }), + } + ); + + if (!res.ok) { + const body = await res.text().catch(() => '(unreadable)'); + throw new Error(`Stream Chat sendMessage failed (${res.status}): ${body}`); + } +} diff --git a/src/lib/kiloclaw/kiloclaw-internal-client.ts b/src/lib/kiloclaw/kiloclaw-internal-client.ts index 3b001fe89a..32ddef457c 100644 --- a/src/lib/kiloclaw/kiloclaw-internal-client.ts +++ b/src/lib/kiloclaw/kiloclaw-internal-client.ts @@ -201,6 +201,21 @@ export class KiloClawInternalClient { }); } + async sendChatMessage( + userId: string, + message: string, + instanceId?: string + ): Promise<{ success: boolean; channelId: string }> { + return this.request( + '/api/platform/send-chat-message', + { + method: 'POST', + body: JSON.stringify({ userId, message, instanceId }), + }, + { userId } + ); + } + async getDebugStatus(userId: string, instanceId?: string): Promise { const params = new URLSearchParams({ userId }); if (instanceId) params.set('instanceId', instanceId); diff --git a/src/routers/kiloclaw-router.ts b/src/routers/kiloclaw-router.ts index 04f9a291bb..eec6b12bc5 100644 --- a/src/routers/kiloclaw-router.ts +++ b/src/routers/kiloclaw-router.ts @@ -611,6 +611,48 @@ export const kiloclawRouter = createTRPCRouter({ return client.getStreamChatCredentials(ctx.user.id); }), + sendChatMessage: clawAccessProcedure + .input( + z.object({ + instanceId: z.string().uuid().optional(), + message: z.string().min(1).max(32_000), + }) + ) + .mutation(async ({ ctx, input }) => { + if (input.instanceId) { + // Explicit instanceId: verify ownership and non-destroyed + const [row] = await db + .select({ id: kiloclaw_instances.id }) + .from(kiloclaw_instances) + .where( + and( + eq(kiloclaw_instances.id, input.instanceId), + eq(kiloclaw_instances.user_id, ctx.user.id), + isNull(kiloclaw_instances.destroyed_at) + ) + ) + .limit(1); + if (!row) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: 'No active KiloClaw instance found', + }); + } + } else { + // No instanceId: verify the user has any active instance + const instance = await getActiveInstance(ctx.user.id); + if (!instance) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: 'No active KiloClaw instance found', + }); + } + } + + const client = new KiloClawInternalClient(); + return client.sendChatMessage(ctx.user.id, input.message, input.instanceId); + }), + // Instance lifecycle start: clawAccessProcedure.mutation(async ({ ctx }) => { const client = new KiloClawInternalClient(); From cfc3da79f4f58eba1687b98267541771a62057ff Mon Sep 17 00:00:00 2001 From: St0rmz1 Date: Mon, 30 Mar 2026 12:13:28 -0700 Subject: [PATCH 2/2] fix(kiloclaw): preserve HTTP status on sendMessage errors --- kiloclaw/src/stream-chat/client.test.ts | 16 ++++++++++++++++ kiloclaw/src/stream-chat/client.ts | 4 +++- src/routers/kiloclaw-router.ts | 22 +++++++++++++++++++++- 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/kiloclaw/src/stream-chat/client.test.ts b/kiloclaw/src/stream-chat/client.test.ts index 002565dbf7..ce0f495bcb 100644 --- a/kiloclaw/src/stream-chat/client.test.ts +++ b/kiloclaw/src/stream-chat/client.test.ts @@ -445,6 +445,22 @@ describe('sendMessage', () => { ); }); + it('preserves HTTP status on the thrown error for upstream handling', async () => { + mockFetch.mockResolvedValueOnce({ + ok: false, + status: 404, + text: async () => 'Channel not found', + }); + + try { + await sendMessage('key', 'secret', 'chan-1', 'user-1', 'test'); + expect.unreachable('should have thrown'); + } catch (err) { + expect(err).toBeInstanceOf(Error); + expect((err as Error & { status: number }).status).toBe(404); + } + }); + it('handles unreadable error body gracefully', async () => { mockFetch.mockResolvedValueOnce({ ok: false, diff --git a/kiloclaw/src/stream-chat/client.ts b/kiloclaw/src/stream-chat/client.ts index b267799e35..f0324b3730 100644 --- a/kiloclaw/src/stream-chat/client.ts +++ b/kiloclaw/src/stream-chat/client.ts @@ -292,6 +292,8 @@ export async function sendMessage( if (!res.ok) { const body = await res.text().catch(() => '(unreadable)'); - throw new Error(`Stream Chat sendMessage failed (${res.status}): ${body}`); + throw Object.assign(new Error(`Stream Chat sendMessage failed (${res.status}): ${body}`), { + status: res.status, + }); } } diff --git a/src/routers/kiloclaw-router.ts b/src/routers/kiloclaw-router.ts index eec6b12bc5..5b086fcad0 100644 --- a/src/routers/kiloclaw-router.ts +++ b/src/routers/kiloclaw-router.ts @@ -650,7 +650,27 @@ export const kiloclawRouter = createTRPCRouter({ } const client = new KiloClawInternalClient(); - return client.sendChatMessage(ctx.user.id, input.message, input.instanceId); + try { + return await client.sendChatMessage(ctx.user.id, input.message, input.instanceId); + } catch (err) { + if (err instanceof KiloClawApiError) { + const { message } = getKiloClawApiErrorPayload(err); + const code = + err.statusCode === 404 + ? 'NOT_FOUND' + : err.statusCode === 503 + ? 'PRECONDITION_FAILED' + : 'INTERNAL_SERVER_ERROR'; + throw new TRPCError({ + code, + message: message ?? 'Failed to send chat message', + }); + } + throw new TRPCError({ + code: 'INTERNAL_SERVER_ERROR', + message: 'Failed to send chat message', + }); + } }), // Instance lifecycle