From 6c95f2e57cb66ac7f80d0d21bd72f973db600617 Mon Sep 17 00:00:00 2001 From: evan-runloopai <155475557+evan-runloopai@users.noreply.github.com> Date: Mon, 8 Sep 2025 15:24:10 -0700 Subject: [PATCH 1/4] Add stream reconnection --- src/resources/devboxes/executions.ts | 29 ++++++++++++++++++---------- src/streaming.ts | 6 +++++- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/resources/devboxes/executions.ts b/src/resources/devboxes/executions.ts index fb5d6661a..316257f72 100755 --- a/src/resources/devboxes/executions.ts +++ b/src/resources/devboxes/executions.ts @@ -7,6 +7,7 @@ import * as Core from '../../core'; import * as DevboxesAPI from './devboxes'; import { PollingOptions, poll } from '@runloop/api-client/lib/polling'; import { Stream } from '../../streaming'; +import { withStreamAutoReconnect } from '@runloop/api-client/lib/streaming-reconnection'; export class Executions extends APIResource { /** @@ -153,11 +154,15 @@ export class Executions extends APIResource { headers: defaultHeaders, ...options, }; - return this._client.get(`/v1/devboxes/${devboxId}/executions/${executionId}/stream_stderr_updates`, { - query, - ...mergedOptions, - stream: true, - }) as APIPromise>; + const getStream: (offset: number | undefined) => APIPromise> = (offset) => + this._client.get(`/v1/devboxes/${devboxId}/executions/${executionId}/stream_stderr_updates`, { + query: { ...query, offset: offset?.toString() }, + ...mergedOptions, + stream: true, + }); + return withStreamAutoReconnect(getStream, (item) => item.offset) as APIPromise< + Stream + >; } /** @@ -176,11 +181,15 @@ export class Executions extends APIResource { headers: defaultHeaders, ...options, }; - return this._client.get(`/v1/devboxes/${devboxId}/executions/${executionId}/stream_stdout_updates`, { - query, - ...mergedOptions, - stream: true, - }) as APIPromise>; + const getStream: (offset: number | undefined) => APIPromise> = (offset) => + this._client.get(`/v1/devboxes/${devboxId}/executions/${executionId}/stream_stdout_updates`, { + query: { ...query, offset: offset?.toString() }, + ...mergedOptions, + stream: true, + }); + return withStreamAutoReconnect(getStream, (item) => item.offset) as APIPromise< + Stream + >; } } diff --git a/src/streaming.ts b/src/streaming.ts index b056fddf6..9c69d324d 100644 --- a/src/streaming.ts +++ b/src/streaming.ts @@ -1,5 +1,5 @@ import { ReadableStream, type Response } from './_shims/index'; -import { RunloopError } from './error'; +import { APIError, RunloopError } from './error'; import { findDoubleNewlineIndex, LineDecoder } from './internal/decoders/line'; import { ReadableStreamToAsyncIterable } from './internal/stream-utils'; @@ -33,6 +33,10 @@ export class Stream implements AsyncIterable { try { for await (const sse of _iterSSEMessages(response, controller)) { try { + if (sse.event === 'error') { + const error = JSON.parse(sse.data); + throw new APIError(error.code, error, error.message, undefined); + } yield JSON.parse(sse.data); } catch (e) { console.error(`Could not parse message into JSON:`, sse.data); From 4896ea74c4163b9c288f8a18ac1017a862ef4d9a Mon Sep 17 00:00:00 2001 From: evan-runloopai <155475557+evan-runloopai@users.noreply.github.com> Date: Mon, 8 Sep 2025 15:24:17 -0700 Subject: [PATCH 2/4] utils --- src/lib/streaming-reconnection.ts | 40 +++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 src/lib/streaming-reconnection.ts diff --git a/src/lib/streaming-reconnection.ts b/src/lib/streaming-reconnection.ts new file mode 100644 index 000000000..dc59cf246 --- /dev/null +++ b/src/lib/streaming-reconnection.ts @@ -0,0 +1,40 @@ +import { Stream } from '../streaming'; +import { APIError } from '../error'; + +/** + * Wraps a stream with automatic reconnection on timeout. + * + * @param stream The stream to wrap + * @param recreator Function that creates a new stream with the last offset + * @returns A new stream that automatically reconnects on timeout + */ +export async function withStreamAutoReconnect( + streamCreator: (offset: number | undefined) => Promise>, + getOffset: (item: Item) => number | undefined, +): Promise> { + let lastOffset: number | undefined = undefined; + let currentStream = await streamCreator(lastOffset); + + async function* createReconnectingIterator(): AsyncIterator { + while (true) { + try { + for await (const item of currentStream) { + if (getOffset(item) !== undefined) { + lastOffset = getOffset(item); + } + yield item; + } + return; // Stream completed normally + } catch (error) { + if (error instanceof APIError && error.status === 408) { + // Reconnect with the last known offset + currentStream = await streamCreator(lastOffset); + continue; + } + throw error; // Not a timeout, rethrow + } + } + } + + return new Stream(createReconnectingIterator, currentStream.controller); +} From 2460318a159829d73aeaba6355aa726bd4f484bb Mon Sep 17 00:00:00 2001 From: evan-runloopai <155475557+evan-runloopai@users.noreply.github.com> Date: Mon, 8 Sep 2025 15:25:07 -0700 Subject: [PATCH 3/4] cp --- src/lib/streaming-reconnection.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/lib/streaming-reconnection.ts b/src/lib/streaming-reconnection.ts index dc59cf246..f406bfa32 100644 --- a/src/lib/streaming-reconnection.ts +++ b/src/lib/streaming-reconnection.ts @@ -3,10 +3,6 @@ import { APIError } from '../error'; /** * Wraps a stream with automatic reconnection on timeout. - * - * @param stream The stream to wrap - * @param recreator Function that creates a new stream with the last offset - * @returns A new stream that automatically reconnects on timeout */ export async function withStreamAutoReconnect( streamCreator: (offset: number | undefined) => Promise>, From e434425e9337bad158b9d9854848c5b3142de9fe Mon Sep 17 00:00:00 2001 From: evan-runloopai <155475557+evan-runloopai@users.noreply.github.com> Date: Mon, 8 Sep 2025 16:03:39 -0700 Subject: [PATCH 4/4] cp --- src/streaming.ts | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/streaming.ts b/src/streaming.ts index 9c69d324d..49afd291d 100644 --- a/src/streaming.ts +++ b/src/streaming.ts @@ -32,11 +32,19 @@ export class Stream implements AsyncIterable { let done = false; try { for await (const sse of _iterSSEMessages(response, controller)) { - try { - if (sse.event === 'error') { - const error = JSON.parse(sse.data); - throw new APIError(error.code, error, error.message, undefined); + if (sse.event === 'error') { + let error: APIError | Error | undefined = new Error(sse.data); + try { + const errorObj = JSON.parse(sse.data); + error = new APIError(parseInt(errorObj.code), errorObj, errorObj.message, undefined); + } catch (e) { + console.error('Could not parse error message into JSON:', sse.data); + error = new Error(sse.data); } + throw error; + } + + try { yield JSON.parse(sse.data); } catch (e) { console.error(`Could not parse message into JSON:`, sse.data);