diff --git a/.changeset/late-owls-wave.md b/.changeset/late-owls-wave.md new file mode 100644 index 0000000000..d3b7e22f63 --- /dev/null +++ b/.changeset/late-owls-wave.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/client': patch +--- + +Expose the ShapeStream.fetchSnapshot as a public api that can be used to fetch a snapshot without it being injected into the emitted stream of change messages. This is useful for cases where the user wants to handle the application of these snapshot in a custom way. diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index f73b40bc6d..2e01a7b1c3 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -17,6 +17,7 @@ import { InvalidSignalError, MissingShapeHandleError, ReservedParamError, + MissingHeadersError, } from './error' import { BackoffDefaults, @@ -368,16 +369,15 @@ export interface ShapeStreamInterface = Row> { forceDisconnectAndRefresh(): Promise - requestSnapshot(params: { - where?: string - params?: Record - limit: number - offset?: number - orderBy: string - }): Promise<{ + requestSnapshot(params: SubsetParams): Promise<{ metadata: SnapshotMetadata data: Array> }> + + fetchSnapshot(opts: SubsetParams): Promise<{ + metadata: SnapshotMetadata + data: Array> + }> } /** @@ -774,7 +774,10 @@ export class ShapeStream = Row> fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, this.#lastOffset) fetchUrl.searchParams.set(LOG_MODE_QUERY_PARAM, this.#mode) - if (this.#isUpToDate) { + // Snapshot requests (with subsetParams) should never use live polling + const isSnapshotRequest = subsetParams !== undefined + + if (this.#isUpToDate && !isSnapshotRequest) { // If we are resuming from a paused state, we don't want to perform a live request // because it could be a long poll that holds for 20sec // and during all that time `isConnected` will be false @@ -846,11 +849,7 @@ export class ShapeStream = Row> this.#liveCacheBuster = liveCacheBuster } - const getSchema = (): Schema => { - const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER) - return schemaHeader ? JSON.parse(schemaHeader) : {} - } - this.#schema = this.#schema ?? getSchema() + this.#schema = this.#schema ?? getSchemaFromHeaders(headers) // NOTE: 204s are deprecated, the Electric server should not // send these in latest versions but this is here for backwards @@ -1237,7 +1236,7 @@ export class ShapeStream = Row> } /** - * Request a snapshot for subset of data. + * Request a snapshot for subset of data and inject it into the subscribed data stream. * * Only available when mode is `changes_only`. * Returns the insertion point & the data, but more importantly injects the data @@ -1275,16 +1274,7 @@ export class ShapeStream = Row> this.#pause() } - const { fetchUrl, requestHeaders } = await this.#constructUrl( - this.options.url, - true, - opts - ) - - const { metadata, data } = await this.#fetchSnapshot( - fetchUrl, - requestHeaders - ) + const { metadata, data } = await this.fetchSnapshot(opts) const dataWithEndBoundary = (data as Array>).concat([ { headers: { control: `snapshot-end`, ...metadata } }, @@ -1309,8 +1299,26 @@ export class ShapeStream = Row> } } - async #fetchSnapshot(url: URL, headers: Record) { - const response = await this.#fetchClient(url.toString(), { headers }) + /** + * Fetch a snapshot for subset of data. + * Returns the metadata and the data, but does not inject it into the subscribed data stream. + * + * @param opts - The options for the snapshot request. + * @returns The metadata and the data for the snapshot. + */ + async fetchSnapshot(opts: SubsetParams): Promise<{ + metadata: SnapshotMetadata + data: Array> + }> { + const { fetchUrl, requestHeaders } = await this.#constructUrl( + this.options.url, + true, + opts + ) + + const response = await this.#fetchClient(fetchUrl.toString(), { + headers: requestHeaders, + }) if (!response.ok) { throw new FetchError( @@ -1318,21 +1326,52 @@ export class ShapeStream = Row> undefined, undefined, Object.fromEntries([...response.headers.entries()]), - url.toString() + fetchUrl.toString() ) } - const { metadata, data } = await response.json() - const batch = this.#messageParser.parse>>( - JSON.stringify(data), - this.#schema! + // Use schema from stream if available, otherwise extract from response header + const schema: Schema = + this.#schema ?? + getSchemaFromHeaders(response.headers, { + required: true, + url: fetchUrl.toString(), + }) + + const { metadata, data: rawData } = await response.json() + const data = this.#messageParser.parseSnapshotData>( + rawData, + schema ) return { metadata, - data: batch, + data, + } + } +} + +/** + * Extracts the schema from response headers. + * @param headers - The response headers + * @param options - Options for schema extraction + * @param options.required - If true, throws MissingHeadersError when header is missing. Defaults to false. + * @param options.url - The URL to include in the error message if required is true + * @returns The parsed schema, or an empty object if not required and header is missing + * @throws {MissingHeadersError} if required is true and the header is missing + */ +function getSchemaFromHeaders( + headers: Headers, + options?: { required?: boolean; url?: string } +): Schema { + const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER) + if (!schemaHeader) { + if (options?.required && options?.url) { + throw new MissingHeadersError(options.url, [SHAPE_SCHEMA_HEADER]) } + return {} } + return JSON.parse(schemaHeader) } /** diff --git a/packages/typescript-client/src/parser.ts b/packages/typescript-client/src/parser.ts index e6325ae972..4689848bb3 100644 --- a/packages/typescript-client/src/parser.ts +++ b/packages/typescript-client/src/parser.ts @@ -122,18 +122,56 @@ export class MessageParser> { typeof value === `object` && value !== null ) { - // Parse the row values - const row = value as Record>> - Object.keys(row).forEach((key) => { - row[key] = this.parseRow(key, row[key] as NullableToken, schema) - }) - - if (this.transformer) value = this.transformer(value) + return this.transformMessageValue(value, schema) } return value }) as Result } + /** + * Parse an array of ChangeMessages from a snapshot response. + * Applies type parsing and transformations to the value and old_value properties. + */ + parseSnapshotData( + messages: Array, + schema: Schema + ): Array { + return messages.map((message) => { + const msg = message as Record + + // Transform the value property if it exists + if (msg.value && typeof msg.value === `object` && msg.value !== null) { + msg.value = this.transformMessageValue(msg.value, schema) + } + + // Transform the old_value property if it exists + if ( + msg.old_value && + typeof msg.old_value === `object` && + msg.old_value !== null + ) { + msg.old_value = this.transformMessageValue(msg.old_value, schema) + } + + return msg as Result + }) + } + + /** + * Transform a message value or old_value object by parsing its columns. + */ + private transformMessageValue( + value: unknown, + schema: Schema + ): Row> { + const row = value as Record>> + Object.keys(row).forEach((key) => { + row[key] = this.parseRow(key, row[key] as NullableToken, schema) + }) + + return this.transformer ? this.transformer(row) : row + } + // Parses the message values using the provided parser based on the schema information private parseRow( key: string, diff --git a/packages/typescript-client/test/client.test.ts b/packages/typescript-client/test/client.test.ts index d797fe3339..cd44012bb7 100644 --- a/packages/typescript-client/test/client.test.ts +++ b/packages/typescript-client/test/client.test.ts @@ -2048,6 +2048,225 @@ describe.for(fetchAndSse)( { timeout: 4000 } ) }) + + it( + `fetchSnapshot should return data and metadata without modifying shape state`, + { timeout: 10000 }, + async ({ issuesTableUrl, insertIssues, aborter }) => { + await insertIssues({ title: `A` }, { title: `B` }, { title: `C` }) + + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { table: issuesTableUrl }, + log: `changes_only`, + liveSse, + signal: aborter.signal, + }) + + const shape = new Shape(shapeStream) + + // Initial state should be empty + expect(await shape.rows).toEqual([]) + + // Track messages published to the stream + const seenKeys = new Set() + const unsub = shapeStream.subscribe((msgs) => { + for (const m of msgs) { + if (isChangeMessage(m)) seenKeys.add(m.key) + } + }) + + // fetchSnapshot should work independently, but wait for stream to initialize + // (so we can test that it uses the schema from the stream if available) + await vi.waitFor( + () => { + expect(shapeStream.hasStarted()).toBe(true) + }, + { timeout: 5000 } + ) + + // Test with orderBy + const { metadata, data } = await shapeStream.fetchSnapshot({ + orderBy: `title ASC`, + limit: 100, + }) + + // Verify fetchSnapshot returned data and metadata + expect(data.length).toBe(3) + const returnedTitles = data.map((m) => m.value.title) + expect(returnedTitles).toEqual([`A`, `B`, `C`]) + expect(metadata).toBeDefined() + expect(typeof metadata).toBe(`object`) + + // Verify shape state was NOT modified (should still be empty) + expect(shape.currentRows).toEqual([]) + + // Verify no messages were published to the stream + expect(seenKeys.size).toBe(0) + + // Test with where clause + const { data: filteredData } = await shapeStream.fetchSnapshot({ + where: `title = 'B'`, + orderBy: `title ASC`, + limit: 100, + }) + expect(filteredData.length).toBe(1) + expect(filteredData[0].value.title).toBe(`B`) + + // Test with parametrised where clause + const { data: paramData } = await shapeStream.fetchSnapshot({ + where: `title = $1 OR title = $2`, + params: { '1': `A`, '2': `C` }, + orderBy: `title ASC`, + limit: 100, + }) + const paramTitles = paramData.map((m) => m.value.title).sort() + expect(paramTitles).toEqual([`A`, `C`]) + + // Test with orderBy + limit + const { data: limitedData } = await shapeStream.fetchSnapshot({ + orderBy: `title DESC`, + limit: 2, + }) + const limitedTitles = limitedData.map((m) => m.value.title) + expect(limitedTitles).toEqual([`C`, `B`]) + + // Verify shape state still unchanged after all fetches + expect(shape.currentRows).toEqual([]) + expect(seenKeys.size).toBe(0) + + unsub() + } + ) + + it(`fetchSnapshot does not interfere with stream updates`, async ({ + issuesTableUrl, + insertIssues, + updateIssue, + waitForIssues, + aborter, + }) => { + const [id] = await insertIssues({ title: `before` }) + + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { table: issuesTableUrl }, + log: `changes_only`, + liveSse, + signal: aborter.signal, + }) + const shape = new Shape(shapeStream) + + // Subscribe to capture messages before fetchSnapshot + const messages: Message[] = [] + const unsub = shapeStream.subscribe((msgs) => { + messages.push(...msgs) + }) + + // Wait for stream to be initialized and up-to-date + await vi.waitFor( + () => { + expect( + messages.some( + (m) => isControlMessage(m) && m.headers.control === `up-to-date` + ) + ).toBe(true) + }, + { timeout: 10000 } + ) + + // Fetch a snapshot (should not affect stream) + const { data: snapshotData } = await shapeStream.fetchSnapshot({ + orderBy: `title ASC`, + limit: 100, + }) + expect(snapshotData.length).toBe(1) + expect(snapshotData[0].value.title).toBe(`before`) + + // Shape should still be empty (fetchSnapshot doesn't modify state) + expect(shape.currentRows).toEqual([]) + + // Now perform an update and ensure it is streamed normally + await updateIssue({ id, title: `after` }) + await waitForIssues({ numChangesExpected: 1 }) + + await vi.waitFor( + () => { + const updateMsg = messages.find( + (m) => isChangeMessage(m) && m.headers.operation === `update` + ) as ChangeMessage | undefined + expect(updateMsg?.value?.title).toBe(`after`) + }, + { timeout: 10000 } + ) + unsub() + }) + + it( + `fetchSnapshot handles errors correctly`, + { timeout: 5000 }, + async ({ issuesTableUrl, aborter }) => { + // Create a fetchClient that returns an error immediately for snapshot requests + // We need to bypass backoff retries for this test, so we'll use maxRetries: 0 + let snapshotRequestCount = 0 + const fetchClient = vi.fn( + async (input: string | URL | Request, _init?: RequestInit) => { + const url = input instanceof Request ? input.url : input.toString() + const urlObj = new URL(url) + // Check if this is a snapshot request (has subset params with double underscores) + const isSnapshotRequest = + urlObj.searchParams.has(`subset__where`) || + urlObj.searchParams.has(`subset__limit`) || + urlObj.searchParams.has(`subset__order_by`) + if (isSnapshotRequest) { + snapshotRequestCount++ + return new Response(`{"error": "Internal Server Error"}`, { + status: 500, + statusText: `Internal Server Error`, + headers: { 'Content-Type': `application/json` }, + }) + } + // For normal requests, return a minimal response to avoid hanging on database + // (we're only testing snapshot error handling, not the full stream) + return new Response(`[]`, { + status: 200, + headers: { + 'electric-schema': JSON.stringify({}), + 'electric-offset': `0_0`, + }, + }) + } + ) + + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + params: { table: issuesTableUrl }, + log: `changes_only`, + liveSse, + signal: aborter.signal, + fetchClient, + // Disable retries for this test to ensure errors are thrown immediately + backoffOptions: { + maxRetries: 0, + initialDelay: 0, + maxDelay: 0, + multiplier: 1, + }, + }) + + // fetchSnapshot should work independently - call it immediately + // The error should be thrown immediately (500 is not retried, and we disabled retries anyway) + await expect( + shapeStream.fetchSnapshot({ + orderBy: `title ASC`, + limit: 100, + }) + ).rejects.toThrow(FetchError) + + // Verify the snapshot request was made + expect(snapshotRequestCount).toBeGreaterThan(0) + } + ) } )