Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions kiloclaw/src/routes/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { deriveGatewayToken } from '../auth/gateway-token';
import { sandboxIdFromUserId } from '../auth/sandbox-id';
import { writeEvent } from '../utils/analytics';
import { deriveHttpEventName } from '../middleware/analytics';
import { sendMessage } from '../stream-chat/client';

const GmailHistoryIdSchema = z.object({
userId: z.string().min(1),
Expand Down Expand Up @@ -184,6 +185,8 @@ const SAFE_ERROR_PREFIXES = [
'Cannot restore: ', // snapshot restore: bad state
'Cannot destroy: ', // destroy while restoring
'Cannot retry recovery', // force-retry-recovery guard messages
'Stream Chat sendMessage failed', // sendMessage HTTP errors
'Stream Chat is not set up', // no Stream Chat on this instance
];

function sanitizeError(err: unknown, operation: string): { message: string; status: number } {
Expand Down Expand Up @@ -1194,6 +1197,52 @@ platform.get('/stream-chat-credentials', async c => {
}
});

// POST /api/platform/send-chat-message
// Send a message to a KiloClaw instance's Stream Chat channel as the human user.
// The OpenClaw bot picks it up and responds as if the user typed it.
const SendChatMessageSchema = z.object({
userId: z.string().min(1),
instanceId: z.string().uuid().optional(),
message: z.string().min(1).max(32_000),
});

platform.post('/send-chat-message', async c => {
const body: unknown = await c.req.json().catch(() => null);
const parsed = SendChatMessageSchema.safeParse(body);
if (!parsed.success) {
return jsonError('Invalid request body: userId and message are required', 400);
}

const { userId, instanceId, message } = parsed.data;
c.set('userId', userId);

const apiKey = c.env.STREAM_CHAT_API_KEY;
const apiSecret = c.env.STREAM_CHAT_API_SECRET;
if (!apiKey || !apiSecret) {
return jsonError('Stream Chat is not configured', 503);
}

try {
// Get the sandboxId and channel info from the DO
const creds = await withDORetry(
instanceStubFactory(c.env, userId, instanceId),
stub => stub.getStreamChatCredentials(),
'getStreamChatCredentials'
);

if (!creds) {
return jsonError('Stream Chat is not set up for this instance', 404);
}

await sendMessage(apiKey, apiSecret, creds.channelId, creds.userId, message);

return c.json({ success: true, channelId: creds.channelId });
} catch (err) {
const { message: errMsg, status } = sanitizeError(err, 'send-chat-message');
return jsonError(errMsg, status);
}
});

// GET /api/platform/debug-status?userId=...&instanceId=...
// Internal/admin-only debug status that includes DO destroy internals.
platform.get('/debug-status', async c => {
Expand Down
100 changes: 100 additions & 0 deletions kiloclaw/src/stream-chat/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
deactivateStreamChatUsers,
reactivateStreamChatUsers,
setupDefaultStreamChatChannel,
sendMessage,
} from './client';

// Decode a JWT payload without verifying signature (for test assertions only).
Expand Down Expand Up @@ -375,3 +376,102 @@ describe('setupDefaultStreamChatChannel', () => {
expect(result.botUserId).toBe('bot-sandbox-new');
});
});

describe('sendMessage', () => {
const mockFetch = vi.fn();

beforeEach(() => {
vi.stubGlobal('fetch', mockFetch);
});

afterEach(() => {
vi.unstubAllGlobals();
mockFetch.mockReset();
});

it('POSTs to /channels/messaging/{channelId}/message with correct payload', async () => {
mockFetch.mockResolvedValueOnce({ ok: true, status: 201 });

await sendMessage(
'my-api-key',
'my-api-secret',
'default-sandbox-abc',
'sandbox-abc',
'Hello bot!'
);

expect(mockFetch).toHaveBeenCalledOnce();
const [url, opts] = mockFetch.mock.calls[0] as [
string,
RequestInit & { headers: Record<string, string> },
];
expect(url).toBe(
'https://chat.stream-io-api.com/channels/messaging/default-sandbox-abc/message?api_key=my-api-key'
);
expect(opts.method).toBe('POST');
expect(opts.headers['Content-Type']).toBe('application/json');
expect(opts.headers['Stream-Auth-Type']).toBe('jwt');
// Authorization header should be a server JWT
expect(opts.headers['Authorization']).toBeDefined();
expect(opts.headers['Authorization'].split('.')).toHaveLength(3);

const body = JSON.parse(opts.body as string) as { message: { text: string; user_id: string } };
expect(body.message.text).toBe('Hello bot!');
expect(body.message.user_id).toBe('sandbox-abc');
});

it('uses a server token (server: true) for authentication', async () => {
mockFetch.mockResolvedValueOnce({ ok: true, status: 201 });

await sendMessage('key', 'secret', 'chan-1', 'user-1', 'test');

const [, opts] = mockFetch.mock.calls[0] as [
string,
RequestInit & { headers: Record<string, string> },
];
const payload = decodeJwtPayload(opts.headers['Authorization']);
expect(payload.server).toBe(true);
});

it('throws on HTTP error with status and body in the message', async () => {
mockFetch.mockResolvedValueOnce({
ok: false,
status: 403,
text: async () => 'User is deactivated',
});

await expect(sendMessage('key', 'secret', 'chan-1', 'user-1', 'test')).rejects.toThrow(
'Stream Chat sendMessage failed (403): User is deactivated'
);
});

it('preserves HTTP status on the thrown error for upstream handling', async () => {
mockFetch.mockResolvedValueOnce({
ok: false,
status: 404,
text: async () => 'Channel not found',
});

try {
await sendMessage('key', 'secret', 'chan-1', 'user-1', 'test');
expect.unreachable('should have thrown');
} catch (err) {
expect(err).toBeInstanceOf(Error);
expect((err as Error & { status: number }).status).toBe(404);
}
});

it('handles unreadable error body gracefully', async () => {
mockFetch.mockResolvedValueOnce({
ok: false,
status: 500,
text: async () => {
throw new Error('body read error');
},
});

await expect(sendMessage('key', 'secret', 'chan-1', 'user-1', 'test')).rejects.toThrow(
'Stream Chat sendMessage failed (500): (unreadable)'
);
});
});
45 changes: 45 additions & 0 deletions kiloclaw/src/stream-chat/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,48 @@ export async function setupDefaultStreamChatChannel(

return { apiKey, botUserId, botUserToken, channelId };
}

/**
* Send a message to a Stream Chat channel on behalf of a user.
*
* Used to programmatically inject messages into a KiloClaw instance's chat
* channel. The message appears as if the user typed it, so the OpenClaw bot
* (listening via the openclaw-channel-streamchat plugin) processes and responds.
*
* @param apiKey Stream Chat API key
* @param apiSecret Stream Chat API secret (used to mint a server JWT)
* @param channelId Target channel ID, e.g. `default-{sandboxId}`
* @param userId The user ID to send the message as (typically the sandboxId)
* @param text Plain-text message content
*/
export async function sendMessage(
apiKey: string,
apiSecret: string,
channelId: string,
userId: string,
text: string
): Promise<void> {
const serverToken = await createServerToken(apiSecret);

const res = await fetch(
`${STREAM_CHAT_API_BASE}/channels/messaging/${channelId}/message?api_key=${apiKey}`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Stream-Auth-Type': 'jwt',
Authorization: serverToken,
},
body: JSON.stringify({
message: { text, user_id: userId },
}),
}
);

if (!res.ok) {
const body = await res.text().catch(() => '(unreadable)');
throw Object.assign(new Error(`Stream Chat sendMessage failed (${res.status}): ${body}`), {
status: res.status,
});
}
}
15 changes: 15 additions & 0 deletions src/lib/kiloclaw/kiloclaw-internal-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,21 @@ export class KiloClawInternalClient {
});
}

async sendChatMessage(
userId: string,
message: string,
instanceId?: string
): Promise<{ success: boolean; channelId: string }> {
return this.request(
'/api/platform/send-chat-message',
{
method: 'POST',
body: JSON.stringify({ userId, message, instanceId }),
},
{ userId }
);
}

async getDebugStatus(userId: string, instanceId?: string): Promise<PlatformDebugStatusResponse> {
const params = new URLSearchParams({ userId });
if (instanceId) params.set('instanceId', instanceId);
Expand Down
62 changes: 62 additions & 0 deletions src/routers/kiloclaw-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,68 @@ export const kiloclawRouter = createTRPCRouter({
return client.getStreamChatCredentials(ctx.user.id);
}),

sendChatMessage: clawAccessProcedure
.input(
z.object({
instanceId: z.string().uuid().optional(),
message: z.string().min(1).max(32_000),
})
)
.mutation(async ({ ctx, input }) => {
if (input.instanceId) {
// Explicit instanceId: verify ownership and non-destroyed
const [row] = await db
.select({ id: kiloclaw_instances.id })
.from(kiloclaw_instances)
.where(
and(
eq(kiloclaw_instances.id, input.instanceId),
eq(kiloclaw_instances.user_id, ctx.user.id),
isNull(kiloclaw_instances.destroyed_at)
)
)
.limit(1);
if (!row) {
throw new TRPCError({
code: 'NOT_FOUND',
message: 'No active KiloClaw instance found',
});
}
} else {
// No instanceId: verify the user has any active instance
const instance = await getActiveInstance(ctx.user.id);
if (!instance) {
throw new TRPCError({
code: 'NOT_FOUND',
message: 'No active KiloClaw instance found',
});
}
}

const client = new KiloClawInternalClient();
try {
return await client.sendChatMessage(ctx.user.id, input.message, input.instanceId);
} catch (err) {
if (err instanceof KiloClawApiError) {
const { message } = getKiloClawApiErrorPayload(err);
const code =
err.statusCode === 404
? 'NOT_FOUND'
: err.statusCode === 503
? 'PRECONDITION_FAILED'
: 'INTERNAL_SERVER_ERROR';
throw new TRPCError({
code,
message: message ?? 'Failed to send chat message',
});
}
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to send chat message',
});
}
}),

// Instance lifecycle
start: clawAccessProcedure.mutation(async ({ ctx }) => {
const client = new KiloClawInternalClient();
Expand Down
Loading