diff --git a/src/lib/streaming-reconnection.ts b/src/lib/streaming-reconnection.ts new file mode 100644 index 000000000..f406bfa32 --- /dev/null +++ b/src/lib/streaming-reconnection.ts @@ -0,0 +1,36 @@ +import { Stream } from '../streaming'; +import { APIError } from '../error'; + +/** + * Wraps a stream with automatic reconnection on timeout. + */ +export async function withStreamAutoReconnect( + streamCreator: (offset: number | undefined) => Promise>, + getOffset: (item: Item) => number | undefined, +): Promise> { + let lastOffset: number | undefined = undefined; + let currentStream = await streamCreator(lastOffset); + + async function* createReconnectingIterator(): AsyncIterator { + while (true) { + try { + for await (const item of currentStream) { + if (getOffset(item) !== undefined) { + lastOffset = getOffset(item); + } + yield item; + } + return; // Stream completed normally + } catch (error) { + if (error instanceof APIError && error.status === 408) { + // Reconnect with the last known offset + currentStream = await streamCreator(lastOffset); + continue; + } + throw error; // Not a timeout, rethrow + } + } + } + + return new Stream(createReconnectingIterator, currentStream.controller); +} diff --git a/src/resources/devboxes/executions.ts b/src/resources/devboxes/executions.ts index fb5d6661a..316257f72 100755 --- a/src/resources/devboxes/executions.ts +++ b/src/resources/devboxes/executions.ts @@ -7,6 +7,7 @@ import * as Core from '../../core'; import * as DevboxesAPI from './devboxes'; import { PollingOptions, poll } from '@runloop/api-client/lib/polling'; import { Stream } from '../../streaming'; +import { withStreamAutoReconnect } from '@runloop/api-client/lib/streaming-reconnection'; export class Executions extends APIResource { /** @@ -153,11 +154,15 @@ export class Executions extends APIResource { headers: defaultHeaders, ...options, }; - return this._client.get(`/v1/devboxes/${devboxId}/executions/${executionId}/stream_stderr_updates`, { - query, - ...mergedOptions, - stream: true, - }) as APIPromise>; + const getStream: (offset: number | undefined) => APIPromise> = (offset) => + this._client.get(`/v1/devboxes/${devboxId}/executions/${executionId}/stream_stderr_updates`, { + query: { ...query, offset: offset?.toString() }, + ...mergedOptions, + stream: true, + }); + return withStreamAutoReconnect(getStream, (item) => item.offset) as APIPromise< + Stream + >; } /** @@ -176,11 +181,15 @@ export class Executions extends APIResource { headers: defaultHeaders, ...options, }; - return this._client.get(`/v1/devboxes/${devboxId}/executions/${executionId}/stream_stdout_updates`, { - query, - ...mergedOptions, - stream: true, - }) as APIPromise>; + const getStream: (offset: number | undefined) => APIPromise> = (offset) => + this._client.get(`/v1/devboxes/${devboxId}/executions/${executionId}/stream_stdout_updates`, { + query: { ...query, offset: offset?.toString() }, + ...mergedOptions, + stream: true, + }); + return withStreamAutoReconnect(getStream, (item) => item.offset) as APIPromise< + Stream + >; } } diff --git a/src/streaming.ts b/src/streaming.ts index b056fddf6..49afd291d 100644 --- a/src/streaming.ts +++ b/src/streaming.ts @@ -1,5 +1,5 @@ import { ReadableStream, type Response } from './_shims/index'; -import { RunloopError } from './error'; +import { APIError, RunloopError } from './error'; import { findDoubleNewlineIndex, LineDecoder } from './internal/decoders/line'; import { ReadableStreamToAsyncIterable } from './internal/stream-utils'; @@ -32,6 +32,18 @@ export class Stream implements AsyncIterable { let done = false; try { for await (const sse of _iterSSEMessages(response, controller)) { + if (sse.event === 'error') { + let error: APIError | Error | undefined = new Error(sse.data); + try { + const errorObj = JSON.parse(sse.data); + error = new APIError(parseInt(errorObj.code), errorObj, errorObj.message, undefined); + } catch (e) { + console.error('Could not parse error message into JSON:', sse.data); + error = new Error(sse.data); + } + throw error; + } + try { yield JSON.parse(sse.data); } catch (e) {