Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/late-owls-wave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/client': patch
---

Expose the ShapeStream.fetchSnapshot as a public api that can be used to fetch a snapshot without it being injected into the emitted stream of change messages. This is useful for cases where the user wants to handle the application of these snapshot in a custom way.
103 changes: 71 additions & 32 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
InvalidSignalError,
MissingShapeHandleError,
ReservedParamError,
MissingHeadersError,
} from './error'
import {
BackoffDefaults,
Expand Down Expand Up @@ -368,16 +369,15 @@ export interface ShapeStreamInterface<T extends Row<unknown> = Row> {

forceDisconnectAndRefresh(): Promise<void>

requestSnapshot(params: {
where?: string
params?: Record<string, string>
limit: number
offset?: number
orderBy: string
}): Promise<{
requestSnapshot(params: SubsetParams): Promise<{
metadata: SnapshotMetadata
data: Array<Message<T>>
}>

fetchSnapshot(opts: SubsetParams): Promise<{
metadata: SnapshotMetadata
data: Array<ChangeMessage<T>>
}>
}

/**
Expand Down Expand Up @@ -774,7 +774,10 @@ export class ShapeStream<T extends Row<unknown> = Row>
fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, this.#lastOffset)
fetchUrl.searchParams.set(LOG_MODE_QUERY_PARAM, this.#mode)

if (this.#isUpToDate) {
// Snapshot requests (with subsetParams) should never use live polling
const isSnapshotRequest = subsetParams !== undefined

if (this.#isUpToDate && !isSnapshotRequest) {
// If we are resuming from a paused state, we don't want to perform a live request
// because it could be a long poll that holds for 20sec
// and during all that time `isConnected` will be false
Expand Down Expand Up @@ -846,11 +849,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#liveCacheBuster = liveCacheBuster
}

const getSchema = (): Schema => {
const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER)
return schemaHeader ? JSON.parse(schemaHeader) : {}
}
this.#schema = this.#schema ?? getSchema()
this.#schema = this.#schema ?? getSchemaFromHeaders(headers)

// NOTE: 204s are deprecated, the Electric server should not
// send these in latest versions but this is here for backwards
Expand Down Expand Up @@ -1237,7 +1236,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
}

/**
* Request a snapshot for subset of data.
* Request a snapshot for subset of data and inject it into the subscribed data stream.
*
* Only available when mode is `changes_only`.
* Returns the insertion point & the data, but more importantly injects the data
Expand Down Expand Up @@ -1275,16 +1274,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#pause()
}

const { fetchUrl, requestHeaders } = await this.#constructUrl(
this.options.url,
true,
opts
)

const { metadata, data } = await this.#fetchSnapshot(
fetchUrl,
requestHeaders
)
const { metadata, data } = await this.fetchSnapshot(opts)

const dataWithEndBoundary = (data as Array<Message<T>>).concat([
{ headers: { control: `snapshot-end`, ...metadata } },
Expand All @@ -1309,30 +1299,79 @@ export class ShapeStream<T extends Row<unknown> = Row>
}
}

async #fetchSnapshot(url: URL, headers: Record<string, string>) {
const response = await this.#fetchClient(url.toString(), { headers })
/**
* Fetch a snapshot for subset of data.
* Returns the metadata and the data, but does not inject it into the subscribed data stream.
*
* @param opts - The options for the snapshot request.
* @returns The metadata and the data for the snapshot.
*/
async fetchSnapshot(opts: SubsetParams): Promise<{
metadata: SnapshotMetadata
data: Array<ChangeMessage<T>>
}> {
const { fetchUrl, requestHeaders } = await this.#constructUrl(
this.options.url,
true,
opts
)

const response = await this.#fetchClient(fetchUrl.toString(), {
headers: requestHeaders,
})

if (!response.ok) {
throw new FetchError(
response.status,
undefined,
undefined,
Object.fromEntries([...response.headers.entries()]),
url.toString()
fetchUrl.toString()
)
}

const { metadata, data } = await response.json()
const batch = this.#messageParser.parse<Array<ChangeMessage<T>>>(
JSON.stringify(data),
this.#schema!
// Use schema from stream if available, otherwise extract from response header
const schema: Schema =
this.#schema ??
getSchemaFromHeaders(response.headers, {
required: true,
url: fetchUrl.toString(),
})

const { metadata, data: rawData } = await response.json()
const data = this.#messageParser.parseSnapshotData<ChangeMessage<T>>(
rawData,
schema
)

return {
metadata,
data: batch,
data,
}
}
}

/**
* Extracts the schema from response headers.
* @param headers - The response headers
* @param options - Options for schema extraction
* @param options.required - If true, throws MissingHeadersError when header is missing. Defaults to false.
* @param options.url - The URL to include in the error message if required is true
* @returns The parsed schema, or an empty object if not required and header is missing
* @throws {MissingHeadersError} if required is true and the header is missing
*/
function getSchemaFromHeaders(
headers: Headers,
options?: { required?: boolean; url?: string }
): Schema {
const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER)
if (!schemaHeader) {
if (options?.required && options?.url) {
throw new MissingHeadersError(options.url, [SHAPE_SCHEMA_HEADER])
}
return {}
}
return JSON.parse(schemaHeader)
}

/**
Expand Down
52 changes: 45 additions & 7 deletions packages/typescript-client/src/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,56 @@ export class MessageParser<T extends Row<unknown>> {
typeof value === `object` &&
value !== null
) {
// Parse the row values
const row = value as Record<string, Value<GetExtensions<T>>>
Object.keys(row).forEach((key) => {
row[key] = this.parseRow(key, row[key] as NullableToken, schema)
})

if (this.transformer) value = this.transformer(value)
return this.transformMessageValue(value, schema)
}
return value
}) as Result
}

/**
* Parse an array of ChangeMessages from a snapshot response.
* Applies type parsing and transformations to the value and old_value properties.
*/
parseSnapshotData<Result>(
messages: Array<unknown>,
schema: Schema
): Array<Result> {
return messages.map((message) => {
const msg = message as Record<string, unknown>

// Transform the value property if it exists
if (msg.value && typeof msg.value === `object` && msg.value !== null) {
msg.value = this.transformMessageValue(msg.value, schema)
}

// Transform the old_value property if it exists
if (
msg.old_value &&
typeof msg.old_value === `object` &&
msg.old_value !== null
) {
msg.old_value = this.transformMessageValue(msg.old_value, schema)
}

return msg as Result
})
}

/**
* Transform a message value or old_value object by parsing its columns.
*/
private transformMessageValue(
value: unknown,
schema: Schema
): Row<GetExtensions<T>> {
const row = value as Record<string, Value<GetExtensions<T>>>
Object.keys(row).forEach((key) => {
row[key] = this.parseRow(key, row[key] as NullableToken, schema)
})

return this.transformer ? this.transformer(row) : row
}

// Parses the message values using the provided parser based on the schema information
private parseRow(
key: string,
Expand Down
Loading
Loading