diff --git a/.changeset/stream-binding-local-mode.md b/.changeset/stream-binding-local-mode.md new file mode 100644 index 0000000000..1f800be9f4 --- /dev/null +++ b/.changeset/stream-binding-local-mode.md @@ -0,0 +1,25 @@ +--- +"miniflare": minor +"wrangler": minor +--- + +Add local mode support for Stream bindings + +Miniflare and `wrangler dev` now support using [Cloudflare Stream](https://developers.cloudflare.com/stream/) bindings locally. + +Supported operations: + +- `upload()` — upload video via URL +- `video(id).details()`, `.update()`, `.delete()`, `.generateToken()` +- `videos.list()` +- `captions.generate()`, `.list()`, `.delete()` +- `downloads.generate()`, `.get()`, `.delete()` +- `watermarks.generate()`, `.list()`, `.get()`, `.delete()` + +The following are not yet supported in local mode and will throw: + +- `createDirectUpload()` +- Caption upload via `File` +- Watermark generation via `File` + +Data is persisted across restarts by default. You must set `streamPersist: false` in Miniflare options to disable persistence. diff --git a/packages/miniflare/src/index.ts b/packages/miniflare/src/index.ts index 6eca63cd3c..bf5817ca3f 100644 --- a/packages/miniflare/src/index.ts +++ b/packages/miniflare/src/index.ts @@ -70,6 +70,7 @@ import { SharedOptions, SOCKET_ENTRY, SOCKET_ENTRY_LOCAL, + STREAM_PLUGIN_NAME, WorkerOptions, WrappedBindingNames, } from "./plugins"; @@ -150,6 +151,7 @@ import type { KVNamespaceListKey, Queue, R2Bucket, + StreamBinding, } from "@cloudflare/workers-types/experimental"; import type { Process } from "@puppeteer/browsers"; @@ -2167,7 +2169,9 @@ export class Miniflare { const urlSafeHost = getURLSafeHost(configuredHost); if (this.#sharedOpts.core.logRequests) { this.#log.logReady( - `${ready} on ${green(`${secure ? "https" : "http"}://${urlSafeHost}:${entryPort}`)}` + `${ready} on ${green( + `${secure ? "https" : "http"}://${urlSafeHost}:${entryPort}` + )}` ); } @@ -2763,6 +2767,12 @@ export class Miniflare { }> { return this.#getProxy(HELLO_WORLD_PLUGIN_NAME, bindingName, workerName); } + getStreamBinding( + bindingName: string, + workerName?: string + ): Promise { + return this.#getProxy(STREAM_PLUGIN_NAME, bindingName, workerName); + } /** @internal */ _getInternalDurableObjectNamespace( diff --git a/packages/miniflare/src/plugins/index.ts b/packages/miniflare/src/plugins/index.ts index 449f88d051..9cc332befa 100644 --- a/packages/miniflare/src/plugins/index.ts +++ b/packages/miniflare/src/plugins/index.ts @@ -31,6 +31,7 @@ import { QUEUES_PLUGIN, QUEUES_PLUGIN_NAME } from "./queues"; import { R2_PLUGIN, R2_PLUGIN_NAME } from "./r2"; import { RATELIMIT_PLUGIN, RATELIMIT_PLUGIN_NAME } from "./ratelimit"; import { SECRET_STORE_PLUGIN, SECRET_STORE_PLUGIN_NAME } from "./secret-store"; +import { STREAM_PLUGIN, STREAM_PLUGIN_NAME } from "./stream"; import { VECTORIZE_PLUGIN, VECTORIZE_PLUGIN_NAME } from "./vectorize"; import { VERSION_METADATA_PLUGIN, @@ -63,6 +64,7 @@ export const PLUGINS = { [BROWSER_RENDERING_PLUGIN_NAME]: BROWSER_RENDERING_PLUGIN, [DISPATCH_NAMESPACE_PLUGIN_NAME]: DISPATCH_NAMESPACE_PLUGIN, [IMAGES_PLUGIN_NAME]: IMAGES_PLUGIN, + [STREAM_PLUGIN_NAME]: STREAM_PLUGIN, [VECTORIZE_PLUGIN_NAME]: VECTORIZE_PLUGIN, [VPC_SERVICES_PLUGIN_NAME]: VPC_SERVICES_PLUGIN, [MTLS_PLUGIN_NAME]: MTLS_PLUGIN, @@ -127,6 +129,7 @@ export type WorkerOptions = z.input & z.input & z.input & z.input & + z.input & z.input & z.input & z.input & @@ -145,6 +148,7 @@ export type SharedOptions = z.input & z.input & z.input & z.input & + z.input & z.input; export const PLUGIN_ENTRIES = Object.entries(PLUGINS) as [ @@ -206,6 +210,7 @@ export * from "./ai"; export * from "./browser-rendering"; export * from "./dispatch-namespace"; export * from "./images"; +export * from "./stream"; export * from "./vectorize"; export * from "./vpc-services"; export * from "./mtls"; diff --git a/packages/miniflare/src/plugins/stream/index.ts b/packages/miniflare/src/plugins/stream/index.ts new file mode 100644 index 0000000000..ebc2dc7b63 --- /dev/null +++ b/packages/miniflare/src/plugins/stream/index.ts @@ -0,0 +1,185 @@ +import fs from "node:fs/promises"; +import BINDING_SCRIPT from "worker:stream/binding"; +import OBJECT_SCRIPT from "worker:stream/object"; +import { z } from "zod"; +import { SharedBindings } from "../../workers"; +import { + getMiniflareObjectBindings, + getPersistPath, + getUserBindingServiceName, + PersistenceSchema, + Plugin, + ProxyNodeBinding, + remoteProxyClientWorker, + RemoteProxyConnectionString, +} from "../shared"; +import type { Service } from "../../runtime"; + +const StreamSchema = z.object({ + binding: z.string(), + remoteProxyConnectionString: z + .custom() + .optional(), +}); + +export const StreamOptionsSchema = z.object({ + stream: StreamSchema.optional(), +}); + +export const StreamSharedOptionsSchema = z.object({ + streamPersist: PersistenceSchema, +}); + +export const STREAM_PLUGIN_NAME = "stream"; +const STREAM_STORAGE_SERVICE_NAME = `${STREAM_PLUGIN_NAME}:storage`; +const STREAM_OBJECT_SERVICE_NAME = `${STREAM_PLUGIN_NAME}:object`; +export const STREAM_OBJECT_CLASS_NAME = "StreamObject"; + +export const STREAM_COMPAT_DATE = "2026-03-23"; + +export const STREAM_PLUGIN: Plugin< + typeof StreamOptionsSchema, + typeof StreamSharedOptionsSchema +> = { + options: StreamOptionsSchema, + sharedOptions: StreamSharedOptionsSchema, + async getBindings(options) { + if (!options.stream) { + return []; + } + + return [ + { + name: options.stream.binding, + service: { + name: getUserBindingServiceName( + STREAM_PLUGIN_NAME, + options.stream.binding, + options.stream.remoteProxyConnectionString + ), + entrypoint: "StreamBinding", + }, + }, + ]; + }, + getNodeBindings(options: z.infer) { + if (!options.stream) { + return {}; + } + return { + [options.stream.binding]: new ProxyNodeBinding(), + }; + }, + async getServices({ + options, + sharedOptions, + tmpPath, + defaultPersistRoot, + unsafeStickyBlobs, + }) { + if (!options.stream) { + return []; + } + + const serviceName = getUserBindingServiceName( + STREAM_PLUGIN_NAME, + options.stream.binding, + options.stream.remoteProxyConnectionString + ); + + if (options.stream.remoteProxyConnectionString) { + return [ + { + name: serviceName, + worker: remoteProxyClientWorker( + options.stream.remoteProxyConnectionString, + options.stream.binding + ), + }, + ]; + } + + const persistPath = getPersistPath( + STREAM_PLUGIN_NAME, + tmpPath, + defaultPersistRoot, + sharedOptions.streamPersist + ); + await fs.mkdir(persistPath, { recursive: true }); + + // Disk storage for blobs and SQL + const storageService = { + name: STREAM_STORAGE_SERVICE_NAME, + disk: { path: persistPath, writable: true }, + } satisfies Service; + + // StreamObject + const objectService = { + name: STREAM_OBJECT_SERVICE_NAME, + worker: { + compatibilityDate: STREAM_COMPAT_DATE, + compatibilityFlags: ["nodejs_compat", "experimental"], + modules: [ + { + name: "object.worker.js", + esModule: OBJECT_SCRIPT(), + }, + ], + durableObjectNamespaces: [ + { + className: STREAM_OBJECT_CLASS_NAME, + uniqueKey: `miniflare-${STREAM_OBJECT_CLASS_NAME}`, + enableSql: true, + }, + ], + durableObjectStorage: { localDisk: STREAM_STORAGE_SERVICE_NAME }, + bindings: [ + { + name: SharedBindings.MAYBE_SERVICE_BLOBS, + service: { name: STREAM_STORAGE_SERVICE_NAME }, + }, + ...getMiniflareObjectBindings(unsafeStickyBlobs), + ], + // Allow the DO to send outbound HTTP requests (fetching watermark images) + globalOutbound: { name: "internet" }, + }, + } satisfies Service; + + // Entrypoint with RPC + const bindingService = { + name: serviceName, + worker: { + compatibilityDate: STREAM_COMPAT_DATE, + compatibilityFlags: ["nodejs_compat", "experimental"], + modules: [ + { + name: "binding.worker.js", + esModule: BINDING_SCRIPT(), + }, + ], + bindings: [ + { + name: "store", + durableObjectNamespace: { + className: STREAM_OBJECT_CLASS_NAME, + serviceName: STREAM_OBJECT_SERVICE_NAME, + }, + }, + ], + // Allow the binding worker to send outbound HTTP requests + // (e.g. fetching video from URL in upload fn) + globalOutbound: { name: "internet" }, + }, + } satisfies Service; + + return [storageService, objectService, bindingService]; + }, + getPersistPath({ streamPersist }, tmpPath) { + return getPersistPath( + STREAM_PLUGIN_NAME, + tmpPath, + undefined, + streamPersist + ); + }, +}; diff --git a/packages/miniflare/src/workers/shared/index.worker.ts b/packages/miniflare/src/workers/shared/index.worker.ts index 840ca7191b..e126a93ee2 100644 --- a/packages/miniflare/src/workers/shared/index.worker.ts +++ b/packages/miniflare/src/workers/shared/index.worker.ts @@ -45,7 +45,7 @@ export { } from "./router.worker"; export type { RouteHandler } from "./router.worker"; -export { get, all, drain } from "./sql.worker"; +export { get, all, drain, createTypedSql } from "./sql.worker"; export type { TypedValue, TypedResult, diff --git a/packages/miniflare/src/workers/stream/binding.worker.ts b/packages/miniflare/src/workers/stream/binding.worker.ts new file mode 100644 index 0000000000..27bd299230 --- /dev/null +++ b/packages/miniflare/src/workers/stream/binding.worker.ts @@ -0,0 +1,259 @@ +import { RpcTarget, WorkerEntrypoint } from "cloudflare:workers"; +import { BadRequestError, InvalidURLError } from "./errors"; +import { + rowToStreamCaption, + rowToStreamDownload, + rowToStreamVideo, + rowToStreamWatermark, +} from "./schemas"; +import type { StreamObject } from "./object.worker"; + +interface Env { + store: DurableObjectNamespace; +} + +function getStub(env: Env): DurableObjectStub { + const id = env.store.idFromName("stream-data"); + return env.store.get(id); +} + +function rowsToDownloadResponse( + rows: { type: StreamDownloadType; download: StreamDownload }[] +): StreamDownloadGetResponse { + const result: StreamDownloadGetResponse = {}; + for (const { type, download } of rows) { + result[type] = download; + } + return result; +} + +export class StreamBinding extends WorkerEntrypoint { + async upload( + urlOrBody: string | ReadableStream, + params?: StreamUrlUploadParams + ): Promise { + let body: ReadableStream; + if (typeof urlOrBody === "string") { + const response = await fetch(urlOrBody); + if (!response.ok || response.body === null) { + throw new InvalidURLError( + `Failed to fetch video from URL: ${response.status} ${response.statusText}` + ); + } + body = response.body; + } else { + body = urlOrBody; + } + const stub = getStub(this.env); + const row = await stub.createVideo(body, params ?? {}); + return rowToStreamVideo(row); + } + + // Not supported in local mode yet + async createDirectUpload( + _params: StreamDirectUploadCreateParams + ): Promise { + throw new BadRequestError( + "createDirectUpload is not supported in local mode" + ); + } + + video(id: string): StreamVideoHandle { + return new StreamVideoHandleImpl(this.env, id); + } + + get videos(): StreamVideos { + return new StreamVideosImpl(this.env); + } + + get watermarks(): StreamWatermarks { + return new StreamWatermarksImpl(this.env); + } +} + +class StreamScopedCaptionsImpl + extends RpcTarget + implements StreamScopedCaptions +{ + readonly #env: Env; + readonly #videoId: string; + + constructor(env: Env, videoId: string) { + super(); + this.#env = env; + this.#videoId = videoId; + } + + async upload(_language: string, _file: File): Promise { + throw new BadRequestError( + "caption upload via File is not supported in local mode" + ); + } + + async generate(language: string): Promise { + const stub = getStub(this.#env); + const row = await stub.generateCaption(this.#videoId, language); + return rowToStreamCaption(row); + } + + async list(language?: string): Promise { + const stub = getStub(this.#env); + const rows = await stub.listCaptions(this.#videoId, language); + return rows.map(rowToStreamCaption); + } + + async delete(language: string): Promise { + const stub = getStub(this.#env); + await stub.deleteCaption(this.#videoId, language); + } +} + +class StreamScopedDownloadsImpl + extends RpcTarget + implements StreamScopedDownloads +{ + readonly #env: Env; + readonly #videoId: string; + + constructor(env: Env, videoId: string) { + super(); + this.#env = env; + this.#videoId = videoId; + } + + async generate( + downloadType: StreamDownloadType = "default" + ): Promise { + const stub = getStub(this.#env); + const rows = await stub.generateDownload(this.#videoId, downloadType); + return rowsToDownloadResponse(rows.map(rowToStreamDownload)); + } + + async get(): Promise { + const stub = getStub(this.#env); + const rows = await stub.listDownloads(this.#videoId); + return rowsToDownloadResponse(rows.map(rowToStreamDownload)); + } + + async delete(downloadType: StreamDownloadType = "default"): Promise { + const stub = getStub(this.#env); + await stub.deleteDownload(this.#videoId, downloadType); + } +} + +class StreamVideoHandleImpl extends RpcTarget implements StreamVideoHandle { + readonly id: string; + readonly #env: Env; + + constructor(env: Env, id: string) { + super(); + this.#env = env; + this.id = id; + } + + async details(): Promise { + const stub = getStub(this.#env); + const row = await stub.getVideo(this.id); + return rowToStreamVideo(row); + } + + async update(params: StreamUpdateVideoParams): Promise { + const stub = getStub(this.#env); + const row = await stub.updateVideo(this.id, params); + return rowToStreamVideo(row); + } + + async delete(): Promise { + const stub = getStub(this.#env); + await stub.deleteVideo(this.id); + } + + async generateToken(): Promise { + const stub = getStub(this.#env); + return stub.generateToken(this.id); + } + + get downloads(): StreamScopedDownloads { + return new StreamScopedDownloadsImpl(this.#env, this.id); + } + + get captions(): StreamScopedCaptions { + return new StreamScopedCaptionsImpl(this.#env, this.id); + } +} + +class StreamVideosImpl extends RpcTarget implements StreamVideos { + readonly #env: Env; + + constructor(env: Env) { + super(); + this.#env = env; + } + + async list(params?: StreamVideosListParams): Promise { + const stub = getStub(this.#env); + const rows = await stub.listVideos(params); + return rows.map(rowToStreamVideo); + } +} + +class StreamWatermarksImpl extends RpcTarget implements StreamWatermarks { + readonly #env: Env; + + constructor(env: Env) { + super(); + this.#env = env; + } + + async generate( + fileOrUrl: File | string | ReadableStream, + params: StreamWatermarkCreateParams + ): Promise { + if (fileOrUrl instanceof File) { + throw new BadRequestError( + "watermark generation via File is not supported in local mode" + ); + } + if ( + params.opacity !== undefined && + (params.opacity < 0 || params.opacity > 1) + ) { + throw new BadRequestError("opacity must be between 0.0 and 1.0"); + } + if ( + params.padding !== undefined && + (params.padding < 0 || params.padding > 1) + ) { + throw new BadRequestError("padding must be between 0.0 and 1.0"); + } + if (params.scale !== undefined && (params.scale < 0 || params.scale > 1)) { + throw new BadRequestError("scale must be between 0.0 and 1.0"); + } + const stub = getStub(this.#env); + if (typeof fileOrUrl === "string") { + const row = await stub.createWatermarkFromUrl(fileOrUrl, params); + return rowToStreamWatermark(row); + } + // ReadableStream — pre-fetched data passed directly + const buffer = await new Response(fileOrUrl).arrayBuffer(); + const row = await stub.createWatermarkFromBody(buffer, null, params); + return rowToStreamWatermark(row); + } + + async list(): Promise { + const stub = getStub(this.#env); + const rows = await stub.listWatermarks(); + return rows.map(rowToStreamWatermark); + } + + async get(watermarkId: string): Promise { + const stub = getStub(this.#env); + const row = await stub.getWatermark(watermarkId); + return rowToStreamWatermark(row); + } + + async delete(watermarkId: string): Promise { + const stub = getStub(this.#env); + await stub.deleteWatermark(watermarkId); + } +} diff --git a/packages/miniflare/src/workers/stream/errors.ts b/packages/miniflare/src/workers/stream/errors.ts new file mode 100644 index 0000000000..a633d7c4db --- /dev/null +++ b/packages/miniflare/src/workers/stream/errors.ts @@ -0,0 +1,31 @@ +export class StreamBindingError extends Error implements StreamError { + constructor( + message: string, + readonly code: number, + readonly statusCode: number + ) { + super(message); + this.name = "StreamBindingError"; + } +} + +export class BadRequestError extends StreamBindingError { + constructor(message = "Bad Request") { + super(message, 10005, 400); + this.name = "BadRequestError"; + } +} + +export class NotFoundError extends StreamBindingError { + constructor(message = "Not Found") { + super(message, 10003, 404); + this.name = "NotFoundError"; + } +} + +export class InvalidURLError extends StreamBindingError { + constructor(message = "Invalid URL") { + super(message, 10010, 400); + this.name = "InvalidURLError"; + } +} diff --git a/packages/miniflare/src/workers/stream/object.worker.ts b/packages/miniflare/src/workers/stream/object.worker.ts new file mode 100644 index 0000000000..178c8bd659 --- /dev/null +++ b/packages/miniflare/src/workers/stream/object.worker.ts @@ -0,0 +1,623 @@ +import { DurableObject } from "cloudflare:workers"; +import { + all, + BlobStore, + createTypedSql, + get, + SharedBindings, + Timers, +} from "miniflare:shared"; +import { BadRequestError, InvalidURLError, NotFoundError } from "./errors"; +import { SQL_SCHEMA } from "./schemas"; +import type { + CaptionRow, + DownloadRow, + VideoRow, + WatermarkRow, +} from "./schemas"; +import type { BlobId, TypedSql } from "miniflare:shared"; + +const BLOB_NAMESPACE = "stream-data"; + +interface Env { + MINIFLARE_BLOBS?: Fetcher; + MINIFLARE_STICKY_BLOBS?: boolean; + [SharedBindings.MAYBE_JSON_ENABLE_CONTROL_ENDPOINTS]?: boolean; +} + +export class StreamObject extends DurableObject { + readonly timers = new Timers(); + readonly #blob: BlobStore; + readonly #db: TypedSql; + readonly #stmts: ReturnType; + + #now() { + return new Date(this.timers.now()).toISOString(); + } + + constructor(state: DurableObjectState, env: Env) { + super(state, env); + const db = createTypedSql(state.storage); + db.exec("PRAGMA foreign_keys = ON"); + db.exec(SQL_SCHEMA); + this.#db = db; + this.#stmts = sqlStmts(db, () => this.#now()); + const stickyBlobs = !!env.MINIFLARE_STICKY_BLOBS; + this.#blob = new BlobStore( + env.MINIFLARE_BLOBS as Fetcher, + BLOB_NAMESPACE, + stickyBlobs + ); + } + + async createVideo( + body: ReadableStream | null, + params: StreamUrlUploadParams + ): Promise { + const id = crypto.randomUUID(); + const now = this.#now(); + + let blobId: BlobId | null = null; + let size = 0; + + if (body !== null) { + // Count bytes while streaming through to blob storage + const { readable, writable } = new TransformStream< + Uint8Array, + Uint8Array + >({ + transform(chunk, controller) { + size += chunk.byteLength; + controller.enqueue(chunk); + }, + }); + [blobId] = await Promise.all([ + this.#blob.put(readable), + body.pipeTo(writable), + ]); + } + + this.#stmts.insertVideo({ + id, + creator: params.creator ?? null, + meta: JSON.stringify(params.meta ?? {}), + allowed_origins: JSON.stringify(params.allowedOrigins ?? []), + require_signed_urls: params.requireSignedURLs ? 1 : 0, + scheduled_deletion: params.scheduledDeletion ?? null, + thumbnail_timestamp_pct: params.thumbnailTimestampPct ?? 0, + created: now, + modified: now, + uploaded: body !== null ? now : null, + status_state: body !== null ? "ready" : "pendingupload", + ready_to_stream: body !== null ? 1 : 0, + size, + blob_id: blobId, + }); + + const row = get(this.#stmts.getVideo({ id })); + if (row === undefined) throw new NotFoundError(`Video not found: ${id}`); + return row; + } + + async getVideo(id: string): Promise { + const row = get(this.#stmts.getVideo({ id })); + if (row === undefined) throw new NotFoundError(`Video not found: ${id}`); + return row; + } + + async updateVideo( + id: string, + params: StreamUpdateVideoParams + ): Promise { + return this.#stmts.updateVideo(id, params); + } + + async deleteVideo(id: string): Promise { + const blobIds = this.#stmts.deleteVideo(id); + await Promise.all(blobIds.map((b) => this.#blob.delete(b))); + } + + async listVideos(params?: StreamVideosListParams): Promise { + if (params?.before === undefined && params?.after === undefined) { + if (params?.limit === undefined) { + return all(this.#stmts.listVideos({})); + } + return all(this.#stmts.listVideosLimit({ limit: params.limit })); + } + + const compToSql: Record = { + eq: "=", + gt: ">", + gte: ">=", + lt: "<", + lte: "<=", + }; + const conditions: string[] = []; + const values: (string | number)[] = []; + + if (params.before !== undefined) { + const op = compToSql[params.beforeComp ?? "lt"]; + if (op === undefined) { + throw new BadRequestError( + "Invalid comparison operator: " + String(params.beforeComp) + ); + } + conditions.push("created " + op + " ?"); + values.push(params.before); + } + if (params.after !== undefined) { + const op = compToSql[params.afterComp ?? "gte"]; + if (op === undefined) { + throw new BadRequestError( + "Invalid comparison operator: " + String(params.afterComp) + ); + } + conditions.push("created " + op + " ?"); + values.push(params.after); + } + + values.push(params.limit ?? 1000); + const sql = + "SELECT * FROM _mf_stream_videos WHERE " + + conditions.join(" AND ") + + " ORDER BY created DESC LIMIT ?"; + return Array.from(this.#db.exec(sql, ...values)); + } + + async generateToken(id: string): Promise { + const row = get(this.#stmts.getVideo({ id })); + if (row === undefined) throw new NotFoundError(`Video not found: ${id}`); + + const payload = { + sub: id, + kid: "local-mode-key", + exp: Math.floor(this.timers.now() / 1000) + 6 * 60 * 60, + }; + return btoa(JSON.stringify(payload)); + } + + async generateCaption( + videoId: string, + language: string + ): Promise { + const video = get(this.#stmts.getVideo({ id: videoId })); + if (video === undefined) + throw new NotFoundError(`Video not found: ${videoId}`); + + const label = + new Intl.DisplayNames(["en"], { type: "language" }).of(language) ?? + language; + + this.#stmts.upsertCaption({ + video_id: videoId, + language, + generated: 1, + label, + status: "ready", + }); + + const row = get(this.#stmts.getCaption({ video_id: videoId, language })); + if (row === undefined) + throw new NotFoundError(`Caption not found: ${videoId}/${language}`); + return row; + } + + async listCaptions( + videoId: string, + language?: string + ): Promise { + const video = get(this.#stmts.getVideo({ id: videoId })); + if (video === undefined) + throw new NotFoundError(`Video not found: ${videoId}`); + + if (language !== undefined) { + const row = get(this.#stmts.getCaption({ video_id: videoId, language })); + return row !== undefined ? [row] : []; + } + return all(this.#stmts.listCaptionsByVideo({ video_id: videoId })); + } + + async deleteCaption(videoId: string, language: string): Promise { + const deleted = get( + this.#stmts.deleteCaption({ video_id: videoId, language }) + ); + if (deleted === undefined) { + throw new NotFoundError(`Caption not found: ${videoId}/${language}`); + } + if (deleted.blob_id !== null) { + await this.#blob.delete(deleted.blob_id); + } + } + + async createWatermarkFromUrl( + url: string, + params: StreamWatermarkCreateParams + ): Promise { + const response = await fetch(url); + if (!response.ok || response.body === null) { + throw new InvalidURLError( + `Failed to fetch watermark from URL: ${response.status} ${response.statusText}` + ); + } + + return this.createWatermarkFromBody( + await response.arrayBuffer(), + url, + params + ); + } + + async createWatermarkFromBody( + buffer: ArrayBuffer, + downloadedFrom: string | null, + params: StreamWatermarkCreateParams + ): Promise { + const size = buffer.byteLength; + const blobId = await this.#blob.put( + new Response(buffer).body as ReadableStream + ); + + const id = crypto.randomUUID(); + const now = this.#now(); + + this.#stmts.insertWatermark({ + id, + name: params.name ?? "", + size, + created: now, + downloaded_from: downloadedFrom, + opacity: params.opacity ?? 1.0, + padding: params.padding ?? 0.05, + scale: params.scale ?? 0.15, + position: params.position ?? "upperRight", + blob_id: blobId, + }); + + const row = get(this.#stmts.getWatermark({ id })); + if (row === undefined) + throw new NotFoundError(`Watermark not found: ${id}`); + return row; + } + + async getWatermark(id: string): Promise { + const row = get(this.#stmts.getWatermark({ id })); + if (row === undefined) + throw new NotFoundError(`Watermark not found: ${id}`); + return row; + } + + async listWatermarks(): Promise { + return all(this.#stmts.listWatermarks({})); + } + + async deleteWatermark(id: string): Promise { + const deleted = get(this.#stmts.deleteWatermark({ id })); + if (deleted === undefined) + throw new NotFoundError(`Watermark not found: ${id}`); + if (deleted.blob_id !== null) { + await this.#blob.delete(deleted.blob_id); + } + } + + async generateDownload( + videoId: string, + downloadType: StreamDownloadType = "default" + ): Promise { + const video = get(this.#stmts.getVideo({ id: videoId })); + if (video === undefined) + throw new NotFoundError(`Video not found: ${videoId}`); + + this.#stmts.upsertDownload({ + video_id: videoId, + download_type: downloadType, + status: "ready", + percent_complete: 100.0, + }); + + return all(this.#stmts.listDownloads({ video_id: videoId })); + } + + async listDownloads(videoId: string): Promise { + const video = get(this.#stmts.getVideo({ id: videoId })); + if (video === undefined) + throw new NotFoundError(`Video not found: ${videoId}`); + return all(this.#stmts.listDownloads({ video_id: videoId })); + } + + async deleteDownload( + videoId: string, + downloadType: StreamDownloadType = "default" + ): Promise { + const deleted = get( + this.#stmts.deleteDownload({ + video_id: videoId, + download_type: downloadType, + }) + ); + if (deleted === undefined) { + throw new NotFoundError(`Download not found: ${videoId}/${downloadType}`); + } + } + + async fetch( + req: Request< + unknown, + { miniflare?: { controlOp?: { name: string; args?: unknown[] } } } + > + ) { + if (this.env[SharedBindings.MAYBE_JSON_ENABLE_CONTROL_ENDPOINTS] === true) { + const controlOp = req.cf?.miniflare?.controlOp; + if (controlOp !== undefined) { + const args = controlOp.args ?? []; + switch (controlOp.name) { + case "enableFakeTimers": + await this.timers.enableFakeTimers(args[0] as number); + return Response.json(null); + case "disableFakeTimers": + await this.timers.disableFakeTimers(); + return Response.json(null); + case "advanceFakeTime": + await this.timers.advanceFakeTime(args[0] as number); + return Response.json(null); + case "waitForFakeTasks": + await this.timers.waitForFakeTasks(); + return Response.json(null); + } + } + } + + return new Response(null, { status: 404 }); + } +} + +// Helper functions to return all db statements +function sqlStmts(db: TypedSql, now: () => string) { + // Videos + + const stmtGetVideo = db.stmt, VideoRow>( + "SELECT * FROM _mf_stream_videos WHERE id = :id" + ); + + const stmtInsertVideo = db.stmt< + Pick< + VideoRow, + | "id" + | "creator" + | "meta" + | "allowed_origins" + | "require_signed_urls" + | "scheduled_deletion" + | "thumbnail_timestamp_pct" + | "created" + | "modified" + | "uploaded" + | "status_state" + | "ready_to_stream" + | "size" + | "blob_id" + > + >(`INSERT INTO _mf_stream_videos ( + id, creator, meta, allowed_origins, require_signed_urls, + scheduled_deletion, thumbnail_timestamp_pct, created, modified, + uploaded, status_state, ready_to_stream, size, blob_id + ) VALUES ( + :id, :creator, :meta, :allowed_origins, :require_signed_urls, + :scheduled_deletion, :thumbnail_timestamp_pct, :created, :modified, + :uploaded, :status_state, :ready_to_stream, :size, :blob_id + )`); + + const stmtUpdateVideo = db.stmt< + Pick< + VideoRow, + | "id" + | "modified" + | "creator" + | "meta" + | "allowed_origins" + | "require_signed_urls" + | "scheduled_deletion" + | "thumbnail_timestamp_pct" + | "max_duration_seconds" + > + >(`UPDATE _mf_stream_videos SET + modified = :modified, + creator = :creator, + meta = :meta, + allowed_origins = :allowed_origins, + require_signed_urls = :require_signed_urls, + scheduled_deletion = :scheduled_deletion, + thumbnail_timestamp_pct = :thumbnail_timestamp_pct, + max_duration_seconds = :max_duration_seconds + WHERE id = :id`); + + const stmtGetVideoCaptionBlobs = db.stmt< + Pick, + Pick + >("SELECT blob_id FROM _mf_stream_captions WHERE video_id = :id"); + + const stmtDeleteVideoDownloads = db.stmt>( + "DELETE FROM _mf_stream_downloads WHERE video_id = :id" + ); + + const stmtDeleteVideo = db.stmt< + Pick, + Pick + >("DELETE FROM _mf_stream_videos WHERE id = :id RETURNING blob_id"); + + const stmtListVideos = db.stmt, VideoRow>( + "SELECT * FROM _mf_stream_videos ORDER BY created DESC" + ); + + const stmtListVideosLimit = db.stmt<{ limit: number }, VideoRow>( + "SELECT * FROM _mf_stream_videos ORDER BY created DESC LIMIT :limit" + ); + + // Captions + + const stmtGetCaption = db.stmt< + Pick, + CaptionRow + >( + "SELECT * FROM _mf_stream_captions WHERE video_id = :video_id AND language = :language" + ); + + const stmtUpsertCaption = db.stmt< + Pick + >(`INSERT INTO _mf_stream_captions (video_id, language, generated, label, status) + VALUES (:video_id, :language, :generated, :label, :status) + ON CONFLICT (video_id, language) DO UPDATE SET + generated = excluded.generated, + label = excluded.label, + status = excluded.status`); + + const stmtListCaptionsByVideo = db.stmt< + Pick, + CaptionRow + >("SELECT * FROM _mf_stream_captions WHERE video_id = :video_id"); + + const stmtDeleteCaption = db.stmt< + Pick, + Pick + >( + "DELETE FROM _mf_stream_captions WHERE video_id = :video_id AND language = :language RETURNING blob_id" + ); + + // Watermarks + + const stmtGetWatermark = db.stmt, WatermarkRow>( + "SELECT * FROM _mf_stream_watermarks WHERE id = :id" + ); + + const stmtInsertWatermark = db.stmt< + Pick< + WatermarkRow, + | "id" + | "name" + | "size" + | "created" + | "downloaded_from" + | "opacity" + | "padding" + | "scale" + | "position" + | "blob_id" + > + >(`INSERT INTO _mf_stream_watermarks + (id, name, size, height, width, created, downloaded_from, opacity, padding, scale, position, blob_id) + VALUES (:id, :name, :size, 0, 0, :created, :downloaded_from, :opacity, :padding, :scale, :position, :blob_id)`); + + const stmtListWatermarks = db.stmt, WatermarkRow>( + "SELECT * FROM _mf_stream_watermarks ORDER BY created DESC" + ); + + const stmtDeleteWatermark = db.stmt< + Pick, + Pick + >("DELETE FROM _mf_stream_watermarks WHERE id = :id RETURNING blob_id"); + + // Downloads + + const stmtUpsertDownload = db.stmt< + Pick< + DownloadRow, + "video_id" | "download_type" | "status" | "percent_complete" + > + >(`INSERT INTO _mf_stream_downloads (video_id, download_type, status, percent_complete) + VALUES (:video_id, :download_type, :status, :percent_complete) + ON CONFLICT (video_id, download_type) DO UPDATE SET + status = excluded.status, + percent_complete = excluded.percent_complete`); + + const stmtListDownloads = db.stmt, DownloadRow>( + "SELECT * FROM _mf_stream_downloads WHERE video_id = :video_id" + ); + + const stmtDeleteDownload = db.stmt< + Pick, + Pick + >( + "DELETE FROM _mf_stream_downloads WHERE video_id = :video_id AND download_type = :download_type RETURNING video_id" + ); + + // Operations using transactions + + const deleteVideo = db.txn((id: string): BlobId[] => { + // Collect caption blob_ids before CASCADE deletes rows + const captionBlobs = all(stmtGetVideoCaptionBlobs({ id })) + .map((r) => r.blob_id) + .filter((b): b is BlobId => b !== null); + + stmtDeleteVideoDownloads({ id }); + + const videoRow = get(stmtDeleteVideo({ id })); + if (videoRow === undefined) + throw new NotFoundError(`Video not found: ${id}`); + + const blobIds = captionBlobs; + if (videoRow.blob_id !== null) blobIds.push(videoRow.blob_id); + return blobIds; + }); + + const updateVideo = db.txn( + (id: string, params: StreamUpdateVideoParams): VideoRow => { + const current = get(stmtGetVideo({ id })); + if (current === undefined) + throw new NotFoundError(`Video not found: ${id}`); + + const nowValue = now(); + stmtUpdateVideo({ + id, + modified: nowValue, + creator: + "creator" in params ? (params.creator ?? null) : current.creator, + meta: + params.meta !== undefined + ? JSON.stringify(params.meta) + : current.meta, + allowed_origins: + params.allowedOrigins !== undefined + ? JSON.stringify(params.allowedOrigins) + : current.allowed_origins, + require_signed_urls: + params.requireSignedURLs !== undefined + ? params.requireSignedURLs + ? 1 + : 0 + : current.require_signed_urls, + scheduled_deletion: + "scheduledDeletion" in params + ? (params.scheduledDeletion ?? null) + : current.scheduled_deletion, + thumbnail_timestamp_pct: + params.thumbnailTimestampPct ?? current.thumbnail_timestamp_pct, + max_duration_seconds: + params.maxDurationSeconds ?? current.max_duration_seconds, + }); + + const updated = get(stmtGetVideo({ id })); + if (updated === undefined) + throw new NotFoundError(`Video not found: ${id}`); + return updated; + } + ); + + return { + getVideo: stmtGetVideo, + insertVideo: stmtInsertVideo, + updateVideo, + deleteVideo, + listVideos: stmtListVideos, + listVideosLimit: stmtListVideosLimit, + getCaption: stmtGetCaption, + upsertCaption: stmtUpsertCaption, + listCaptionsByVideo: stmtListCaptionsByVideo, + deleteCaption: stmtDeleteCaption, + getWatermark: stmtGetWatermark, + insertWatermark: stmtInsertWatermark, + listWatermarks: stmtListWatermarks, + deleteWatermark: stmtDeleteWatermark, + upsertDownload: stmtUpsertDownload, + listDownloads: stmtListDownloads, + deleteDownload: stmtDeleteDownload, + }; +} diff --git a/packages/miniflare/src/workers/stream/schemas.ts b/packages/miniflare/src/workers/stream/schemas.ts new file mode 100644 index 0000000000..d5f016e38c --- /dev/null +++ b/packages/miniflare/src/workers/stream/schemas.ts @@ -0,0 +1,207 @@ +export const SQL_SCHEMA = ` +CREATE TABLE IF NOT EXISTS _mf_stream_videos ( + id TEXT PRIMARY KEY, + creator TEXT, + thumbnail TEXT NOT NULL DEFAULT '', + thumbnail_timestamp_pct REAL NOT NULL DEFAULT 0.0, + ready_to_stream INTEGER NOT NULL DEFAULT 1, + ready_to_stream_at TEXT, + status_state TEXT NOT NULL DEFAULT 'ready', + status_pct_complete TEXT, + status_error_reason_code TEXT NOT NULL DEFAULT '', + status_error_reason_text TEXT NOT NULL DEFAULT '', + meta TEXT NOT NULL DEFAULT '{}', + created TEXT NOT NULL, + modified TEXT NOT NULL, + scheduled_deletion TEXT, + size INTEGER NOT NULL DEFAULT 0, + allowed_origins TEXT NOT NULL DEFAULT '[]', + require_signed_urls INTEGER, + uploaded TEXT, + upload_expiry TEXT, + max_size_bytes INTEGER, + max_duration_seconds INTEGER, + duration REAL NOT NULL DEFAULT -1.0, + input_width INTEGER NOT NULL DEFAULT 0, + input_height INTEGER NOT NULL DEFAULT 0, + live_input_id TEXT, + clipped_from_id TEXT, + blob_id TEXT +); + +CREATE TABLE IF NOT EXISTS _mf_stream_captions ( + video_id TEXT NOT NULL, + language TEXT NOT NULL, + generated INTEGER NOT NULL DEFAULT 0, + label TEXT NOT NULL DEFAULT '', + status TEXT, + blob_id TEXT, + PRIMARY KEY (video_id, language), + FOREIGN KEY (video_id) REFERENCES _mf_stream_videos(id) ON DELETE CASCADE +); + +CREATE TABLE IF NOT EXISTS _mf_stream_watermarks ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL DEFAULT '', + size INTEGER NOT NULL DEFAULT 0, + height INTEGER NOT NULL DEFAULT 0, + width INTEGER NOT NULL DEFAULT 0, + created TEXT NOT NULL, + downloaded_from TEXT, + opacity REAL NOT NULL DEFAULT 1.0, + padding REAL NOT NULL DEFAULT 0.05, + scale REAL NOT NULL DEFAULT 0.15, + position TEXT NOT NULL DEFAULT 'upperRight', + blob_id TEXT +); + +CREATE TABLE IF NOT EXISTS _mf_stream_downloads ( + video_id TEXT NOT NULL, + download_type TEXT NOT NULL DEFAULT 'default', + status TEXT NOT NULL DEFAULT 'ready', + percent_complete REAL NOT NULL DEFAULT 100.0, + url TEXT, + PRIMARY KEY (video_id, download_type), + FOREIGN KEY (video_id) REFERENCES _mf_stream_videos(id) ON DELETE CASCADE +); +`; + +export type VideoRow = { + id: string; + creator: string | null; + thumbnail: string; + thumbnail_timestamp_pct: number; + ready_to_stream: number; + ready_to_stream_at: string | null; + status_state: string; + status_pct_complete: string | null; + status_error_reason_code: string; + status_error_reason_text: string; + meta: string; + created: string; + modified: string; + scheduled_deletion: string | null; + size: number; + allowed_origins: string; + require_signed_urls: number | null; + uploaded: string | null; + upload_expiry: string | null; + max_size_bytes: number | null; + max_duration_seconds: number | null; + duration: number; + input_width: number; + input_height: number; + live_input_id: string | null; + clipped_from_id: string | null; + blob_id: string | null; +}; + +export type CaptionRow = { + video_id: string; + language: string; + generated: number; + label: string; + status: string | null; + blob_id: string | null; +}; + +export type WatermarkRow = { + id: string; + name: string; + size: number; + height: number; + width: number; + created: string; + downloaded_from: string | null; + opacity: number; + padding: number; + scale: number; + position: string; + blob_id: string | null; +}; + +export type DownloadRow = { + video_id: string; + download_type: StreamDownloadType; + status: string; + percent_complete: number; + url: string | null; +}; + +export function rowToStreamVideo(row: VideoRow): StreamVideo { + const baseUrl = `https://customer-placeholder.cloudflarestream.com/${row.id}`; + return { + id: row.id, + creator: row.creator, + thumbnail: row.thumbnail || `${baseUrl}/thumbnails/thumbnail.jpg`, + thumbnailTimestampPct: row.thumbnail_timestamp_pct, + readyToStream: row.ready_to_stream === 1, + readyToStreamAt: row.ready_to_stream_at, + status: { + state: row.status_state, + pctComplete: row.status_pct_complete ?? undefined, + errorReasonCode: row.status_error_reason_code, + errorReasonText: row.status_error_reason_text, + }, + meta: JSON.parse(row.meta) as Record, + created: row.created, + modified: row.modified, + scheduledDeletion: row.scheduled_deletion, + size: row.size, + preview: `${baseUrl}/watch`, + allowedOrigins: JSON.parse(row.allowed_origins) as string[], + requireSignedURLs: + row.require_signed_urls === null ? null : row.require_signed_urls === 1, + uploaded: row.uploaded, + uploadExpiry: row.upload_expiry, + maxSizeBytes: row.max_size_bytes, + maxDurationSeconds: row.max_duration_seconds, + duration: row.duration, + input: { width: row.input_width, height: row.input_height }, + hlsPlaybackUrl: `${baseUrl}/manifest/video.m3u8`, + dashPlaybackUrl: `${baseUrl}/manifest/video.mpd`, + watermark: null, + liveInputId: row.live_input_id, + clippedFromId: row.clipped_from_id, + publicDetails: null, + }; +} + +export function rowToStreamCaption(row: CaptionRow): StreamCaption { + return { + language: row.language, + label: row.label || row.language, + generated: row.generated === 1, + status: row.status as StreamCaption["status"], + }; +} + +export function rowToStreamWatermark(row: WatermarkRow): StreamWatermark { + return { + id: row.id, + name: row.name, + size: row.size, + height: row.height, + width: row.width, + created: row.created, + downloadedFrom: row.downloaded_from, + opacity: row.opacity, + padding: row.padding, + scale: row.scale, + position: row.position as StreamWatermarkPosition, + }; +} + +export function rowToStreamDownload(row: DownloadRow): { + type: StreamDownloadType; + download: StreamDownload; +} { + return { + type: row.download_type, + download: { + percentComplete: row.percent_complete, + status: row.status as StreamDownloadStatus, + url: row.url ?? undefined, + }, + }; +} diff --git a/packages/miniflare/test/plugins/stream/index.spec.ts b/packages/miniflare/test/plugins/stream/index.spec.ts new file mode 100644 index 0000000000..6c9fbf30d6 --- /dev/null +++ b/packages/miniflare/test/plugins/stream/index.spec.ts @@ -0,0 +1,1502 @@ +import http from "node:http"; +import { pathToFileURL } from "node:url"; +import { + Miniflare, + STREAM_COMPAT_DATE, + STREAM_OBJECT_CLASS_NAME, + STREAM_PLUGIN_NAME, +} from "miniflare"; +import { describe, test } from "vitest"; +import { + MiniflareDurableObjectControlStub, + useDispose, + useServer, + useTmp, +} from "../../test-shared"; +import type { + StreamCaption as Caption, + StreamDownloadGetResponse as DownloadGetResponse, + StreamVideo as Video, + StreamWatermark as Watermark, +} from "@cloudflare/workers-types"; +import type { MiniflareOptions } from "miniflare"; + +// Mock image / video bytes +const TEST_VIDEO_BYTES = new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7]); +const TEST_IMAGE_BYTES = new Uint8Array([255, 216, 255, 224]); +const STREAM_OBJECT_NAME = "stream-data"; +const FAKE_TIME_START = 1_000_000; + +function staticBytesListener(bytes: Uint8Array): http.RequestListener { + return (_req, res) => { + res.writeHead(200, { "Content-Type": "application/octet-stream" }); + res.end(Buffer.from(bytes)); + }; +} + +function statusListener( + statusCode: number, + statusMessage?: string +): http.RequestListener { + return (_req, res) => { + res.writeHead(statusCode, statusMessage); + res.end(); + }; +} + +const WORKER_SCRIPT = ` +export default { + async fetch(request, env) { + try { + const { op, args } = await request.json(); + const stream = env.STREAM; + const result = await handleCommand(stream, op, args || {}); + return Response.json({ ok: true, result }); + } catch (err) { + return Response.json({ + ok: false, + error: err.message, + name: err.name, + code: err.code, + statusCode: err.statusCode, + }, { status: 200 }); + } + } +} + +async function handleCommand(stream, op, args) { + switch (op) { + case "upload": { + const resp = await fetch(args.url); + return stream.upload(resp.body, args.params); + } + case "upload.fromUrl": + return stream.upload(args.url, args.params); + case "video.details": + return stream.video(args.id).details(); + case "video.update": + return stream.video(args.id).update(args.params); + case "video.delete": + await stream.video(args.id).delete(); + return null; + case "video.generateToken": + return stream.video(args.id).generateToken(); + case "videos.list": + return stream.videos.list(args.params); + case "captions.generate": + return stream.video(args.id).captions.generate(args.language); + case "captions.list": + return stream.video(args.id).captions.list(args.language); + case "captions.delete": + await stream.video(args.id).captions.delete(args.language); + return null; + case "captions.upload": { + const file = new File(["test"], "captions.vtt"); + return stream.video(args.id).captions.upload(args.language, file); + } + case "downloads.generate": + return stream.video(args.id).downloads.generate(args.type); + case "downloads.get": + return stream.video(args.id).downloads.get(); + case "downloads.delete": + await stream.video(args.id).downloads.delete(args.type); + return null; + case "watermarks.generate": { + const resp = await fetch(args.url); + return stream.watermarks.generate(resp.body, args.params || {}); + } + case "watermarks.generate.fromUrl": + return stream.watermarks.generate(args.url, args.params || {}); + case "watermarks.generate.fromFile": { + const file = new File(["test"], "watermark.png"); + return stream.watermarks.generate(file, args.params || {}); + } + case "watermarks.list": + return stream.watermarks.list(); + case "watermarks.get": + return stream.watermarks.get(args.id); + case "watermarks.delete": + await stream.watermarks.delete(args.id); + return null; + case "createDirectUpload": + return stream.createDirectUpload(args.params || {}); + default: + throw new Error("Unknown op: " + op); + } +} +`; + +function createMiniflare(options: Partial = {}): Miniflare { + return new Miniflare({ + compatibilityDate: STREAM_COMPAT_DATE, + stream: { binding: "STREAM" }, + streamPersist: false, + modules: true, + script: WORKER_SCRIPT, + ...options, + } as MiniflareOptions); +} + +async function getStreamObjectControl( + mf: Miniflare +): Promise { + const objectNamespace = await mf._getInternalDurableObjectNamespace( + STREAM_PLUGIN_NAME, + "stream:object", + STREAM_OBJECT_CLASS_NAME + ); + const objectId = objectNamespace.idFromName(STREAM_OBJECT_NAME); + const objectStub = objectNamespace.get(objectId); + return new MiniflareDurableObjectControlStub(objectStub); +} + +type CmdError = Error & { name: string; code?: number; statusCode?: number }; + +async function sendCmdToWorker( + mf: Miniflare, + op: string, + args: Record = {} +): Promise { + const resp = await mf.dispatchFetch("http://placeholder", { + method: "POST", + body: JSON.stringify({ op, args }), + headers: { "Content-Type": "application/json" }, + }); + const data = (await resp.json()) as { + ok: boolean; + result: unknown; + error?: string; + name?: string; + code?: number; + statusCode?: number; + }; + if (!data.ok) { + const err: CmdError = new Error(data.error); + err.name = data.name ?? "Error"; + err.code = data.code; + err.statusCode = data.statusCode; + throw err; + } + return data.result; +} + +describe("Stream videos", () => { + test("upload and retrieve details", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + expect(video.id).toBeTruthy(); + expect(video.readyToStream).toBe(true); + expect(video.status.state).toBe("ready"); + expect(video.size).toBe(TEST_VIDEO_BYTES.byteLength); + expect(video.created).toBeTruthy(); + expect(video.modified).toBeTruthy(); + expect(video.hlsPlaybackUrl).toContain(video.id); + expect(video.dashPlaybackUrl).toContain(video.id); + + const details = (await sendCmdToWorker(mf, "video.details", { + id: video.id, + })) as Video; + expect(details.id).toBe(video.id); + expect(details.readyToStream).toBe(true); + }); + + test("upload with params", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + params: { + creator: "test-creator", + meta: { title: "Test Video" }, + requireSignedURLs: true, + thumbnailTimestampPct: 0.5, + }, + })) as Video; + + expect(video.creator).toBe("test-creator"); + expect(video.meta).toEqual({ title: "Test Video" }); + expect(video.requireSignedURLs).toBe(true); + expect(video.thumbnailTimestampPct).toBe(0.5); + }); + + test("upload from URL propagates fetch failures", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer(statusListener(500, "Boom")); + + await expect( + sendCmdToWorker(mf, "upload.fromUrl", { + url: videoUrl.toString(), + }) + ).rejects.toThrow("Failed to fetch video from URL: 500 Boom"); + }); + + test("throw when getting details for non existent video", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "video.details", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Video not found"); + }); + + test("update video metadata", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const originalModified = video.modified; + + const updated = (await sendCmdToWorker(mf, "video.update", { + id: video.id, + params: { + creator: "new-creator", + meta: { description: "Updated" }, + }, + })) as Video; + + expect(updated.id).toBe(video.id); + expect(updated.creator).toBe("new-creator"); + expect(updated.meta).toEqual({ description: "Updated" }); + expect(updated.modified).not.toBe(originalModified); + }); + + test("throws when updating non existent video", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "video.update", { + id: "00000000-0000-0000-0000-000000000000", + params: { creator: "nobody" }, + }) + ).rejects.toThrow("Video not found"); + }); + + test("delete video", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "video.delete", { id: video.id }); + + await expect( + sendCmdToWorker(mf, "video.details", { id: video.id }) + ).rejects.toThrow("Video not found"); + }); + + test("throws when deleting non existent video", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "video.delete", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Video not found"); + }); + + test("partial update preserves untouched fields", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + params: { + creator: "original-creator", + meta: { title: "Original" }, + requireSignedURLs: true, + thumbnailTimestampPct: 0.3, + }, + })) as Video; + + // Only update creator — all other fields should be preserved + const updated = (await sendCmdToWorker(mf, "video.update", { + id: video.id, + params: { creator: "new-creator" }, + })) as Video; + + expect(updated.creator).toBe("new-creator"); + expect(updated.meta).toEqual({ title: "Original" }); + expect(updated.requireSignedURLs).toBe(true); + expect(updated.thumbnailTimestampPct).toBe(0.3); + }); + + test("update can null-clear creator and scheduledDeletion", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + params: { + creator: "will-be-cleared", + scheduledDeletion: new Date(Date.now() + 86_400_000).toISOString(), + }, + })) as Video; + + expect(video.creator).toBe("will-be-cleared"); + expect(video.scheduledDeletion).toBeTruthy(); + + const updated = (await sendCmdToWorker(mf, "video.update", { + id: video.id, + params: { creator: null, scheduledDeletion: null }, + })) as Video; + + expect(updated.creator).toBeNull(); + expect(updated.scheduledDeletion).toBeNull(); + }); + + test("generate token", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const token = (await sendCmdToWorker(mf, "video.generateToken", { + id: video.id, + })) as string; + + expect(typeof token).toBe("string"); + // Token is b64 encoded JSON + const payload = JSON.parse(atob(token)) as { + sub: string; + kid: string; + exp: number; + }; + expect(payload.sub).toBe(video.id); + expect(payload.kid).toBe("local-mode-key"); + expect(payload.exp).toBeGreaterThan(Math.floor(Date.now() / 1000)); + }); + + test("generate token for non-existent video throws", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "video.generateToken", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Video not found"); + }); +}); + +describe("Stream videos list", () => { + test("list empty", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + const videos = (await sendCmdToWorker(mf, "videos.list")) as Video[]; + expect(videos).toEqual([]); + }); + + test("list all videos", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); + + const videos = (await sendCmdToWorker(mf, "videos.list")) as Video[]; + expect(videos).toHaveLength(3); + for (const v of videos) { + expect(v.id).toBeTruthy(); + } + }); + + test("list with limit", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + for (let i = 0; i < 5; i++) { + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); + } + + const limited = (await sendCmdToWorker(mf, "videos.list", { + params: { limit: 2 }, + })) as Video[]; + expect(limited).toHaveLength(2); + }); + + test("list rejects invalid before comparison operator", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "videos.list", { + params: { before: new Date().toISOString(), beforeComp: "wat" }, + }) + ).rejects.toThrow("Invalid comparison operator: wat"); + }); + + test("list rejects invalid after comparison operator", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "videos.list", { + params: { after: new Date().toISOString(), afterComp: "wat" }, + }) + ).rejects.toThrow("Invalid comparison operator: wat"); + }); + + test("list ordered by created descending", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + const object = await getStreamObjectControl(mf); + await object.enableFakeTimers(FAKE_TIME_START); + + const v1 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await object.advanceFakeTime(5); + const v2 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await object.advanceFakeTime(5); + const v3 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + const videos = (await sendCmdToWorker(mf, "videos.list")) as Video[]; + // Newest first + expect(videos[0].id).toBe(v3.id); + expect(videos[1].id).toBe(v2.id); + expect(videos[2].id).toBe(v1.id); + }); + + test("list filters by before date (default lt operator)", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + const object = await getStreamObjectControl(mf); + await object.enableFakeTimers(FAKE_TIME_START); + + const v1 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await object.advanceFakeTime(10_000); + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); + + // Filter to only videos created before a cutoff that excludes v2 + const cutoff = new Date(FAKE_TIME_START + 5_000).toISOString(); + const filtered = (await sendCmdToWorker(mf, "videos.list", { + params: { before: cutoff }, + })) as Video[]; + + expect(filtered).toHaveLength(1); + expect(filtered[0].id).toBe(v1.id); + }); + + test("list filters by after date (default gte operator)", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + const object = await getStreamObjectControl(mf); + await object.enableFakeTimers(FAKE_TIME_START); + + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); + await object.advanceFakeTime(10_000); + const v2 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + // Filter to only videos created at or after a cutoff that excludes v1 + const cutoff = new Date(FAKE_TIME_START + 5_000).toISOString(); + const filtered = (await sendCmdToWorker(mf, "videos.list", { + params: { after: cutoff }, + })) as Video[]; + + expect(filtered).toHaveLength(1); + expect(filtered[0].id).toBe(v2.id); + }); + + test("list filters with combined before and after bounds", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + const object = await getStreamObjectControl(mf); + await object.enableFakeTimers(FAKE_TIME_START); + + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); // t=0 + await object.advanceFakeTime(10_000); + const v2 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; // t=10s + await object.advanceFakeTime(10_000); + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); // t=20s + + const after = new Date(FAKE_TIME_START + 5_000).toISOString(); + const before = new Date(FAKE_TIME_START + 15_000).toISOString(); + const filtered = (await sendCmdToWorker(mf, "videos.list", { + params: { after, before }, + })) as Video[]; + + expect(filtered).toHaveLength(1); + expect(filtered[0].id).toBe(v2.id); + }); + + test("list with non-default comparison operators (gt, lte)", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + const object = await getStreamObjectControl(mf); + await object.enableFakeTimers(FAKE_TIME_START); + + const v1 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; // t=0 + await object.advanceFakeTime(10_000); + const v2 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; // t=10s + + // afterComp=gt with v1's exact created time should NOT include v1 + const afterGtResult = (await sendCmdToWorker(mf, "videos.list", { + params: { after: v1.created, afterComp: "gt" }, + })) as Video[]; + expect(afterGtResult.map((v) => v.id)).not.toContain(v1.id); + expect(afterGtResult.map((v) => v.id)).toContain(v2.id); + + // beforeComp=lte with v1's exact created time should include v1 + const beforeLteResult = (await sendCmdToWorker(mf, "videos.list", { + params: { before: v1.created, beforeComp: "lte" }, + })) as Video[]; + expect(beforeLteResult.map((v) => v.id)).toContain(v1.id); + expect(beforeLteResult.map((v) => v.id)).not.toContain(v2.id); + }); +}); + +describe("Stream reloads", () => { + test("keeps in-memory data across setOptions reloads", async ({ expect }) => { + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + const opts = { + compatibilityDate: STREAM_COMPAT_DATE, + stream: { binding: "STREAM" }, + streamPersist: false, + modules: true, + script: WORKER_SCRIPT, + } satisfies MiniflareOptions; + const mf = new Miniflare(opts); + useDispose(mf); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + await mf.setOptions({ + ...opts, + script: `${WORKER_SCRIPT}\n// reload stream worker`, + }); + + const details = (await sendCmdToWorker(mf, "video.details", { + id: video.id, + })) as Video; + expect(details.id).toBe(video.id); + + const videos = (await sendCmdToWorker(mf, "videos.list")) as Video[]; + expect(videos).toHaveLength(1); + expect(videos[0].id).toBe(video.id); + }); + + test("keeps persisted data when persistence path format changes on reload", async ({ + expect, + }) => { + const tmp = await useTmp(); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + const opts = { + compatibilityDate: STREAM_COMPAT_DATE, + stream: { binding: "STREAM" }, + streamPersist: tmp, + modules: true, + script: WORKER_SCRIPT, + } satisfies MiniflareOptions; + const mf = new Miniflare(opts); + useDispose(mf); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + await mf.setOptions({ + ...opts, + streamPersist: pathToFileURL(tmp).href, + script: `${WORKER_SCRIPT}\n// reload persisted stream worker`, + }); + + const details = (await sendCmdToWorker(mf, "video.details", { + id: video.id, + })) as Video; + expect(details.id).toBe(video.id); + expect(details.size).toBe(TEST_VIDEO_BYTES.byteLength); + }); +}); + +describe("Stream captions", () => { + test("generate caption", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const caption = (await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "en", + })) as Caption; + + expect(caption.language).toBe("en"); + expect(caption.generated).toBe(true); + expect(caption.status).toBe("ready"); + expect(caption.label).toBeTruthy(); + }); + + test("list captions", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "en", + }); + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "fr", + }); + + const captions = (await sendCmdToWorker(mf, "captions.list", { + id: video.id, + })) as Caption[]; + expect(captions).toHaveLength(2); + const languages = captions.map((c) => c.language); + expect(languages).toContain("en"); + expect(languages).toContain("fr"); + }); + + test("list captions filtered by language", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "en", + }); + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "fr", + }); + + const enOnly = (await sendCmdToWorker(mf, "captions.list", { + id: video.id, + language: "en", + })) as Caption[]; + expect(enOnly).toHaveLength(1); + expect(enOnly[0].language).toBe("en"); + }); + + test("list captions empty language filter", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "en", + }); + + const deOnly = (await sendCmdToWorker(mf, "captions.list", { + id: video.id, + language: "de", + })) as Caption[]; + expect(deOnly).toHaveLength(0); + }); + + test("delete caption", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "en", + }); + await sendCmdToWorker(mf, "captions.delete", { + id: video.id, + language: "en", + }); + + const remaining = (await sendCmdToWorker(mf, "captions.list", { + id: video.id, + })) as Caption[]; + expect(remaining).toHaveLength(0); + }); + + test("throws when deleting non existent caption", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + await expect( + sendCmdToWorker(mf, "captions.delete", { id: video.id, language: "zh" }) + ).rejects.toThrow("Caption not found"); + }); + + test("throws when getting caption for non existent video", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "captions.generate", { + id: "00000000-0000-0000-0000-000000000000", + language: "en", + }) + ).rejects.toThrow("Video not found"); + }); + + test("caption upload via File fails serialization across the binding", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "captions.upload", { + id: "00000000-0000-0000-0000-000000000000", + language: "en", + }) + ).rejects.toThrow( + 'Could not serialize object of type "File". This type does not support serialization.' + ); + }); + + test("generate caption is idempotent (upsert)", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + // Generate the same caption twice + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "en", + }); + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "en", + }); + + // Should still only have one caption, not two + const captions = (await sendCmdToWorker(mf, "captions.list", { + id: video.id, + })) as Caption[]; + expect(captions.filter((c) => c.language === "en")).toHaveLength(1); + }); + + test("list captions throws for non-existent video", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "captions.list", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Video not found"); + }); + + test("delete caption throws for non-existent video", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "captions.delete", { + id: "00000000-0000-0000-0000-000000000000", + language: "en", + }) + ).rejects.toThrow("Caption not found"); + }); +}); + +describe("Stream watermarks", () => { + test("create watermark from URL", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + const watermark = (await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { name: "test-watermark" }, + })) as Watermark; + + expect(watermark.id).toBeTruthy(); + expect(watermark.name).toBe("test-watermark"); + expect(watermark.size).toBe(TEST_IMAGE_BYTES.byteLength); + expect(watermark.created).toBeTruthy(); + + expect(watermark.opacity).toBe(1.0); + expect(watermark.padding).toBe(0.05); + expect(watermark.scale).toBe(0.15); + expect(watermark.position).toBe("upperRight"); + }); + + test("create watermark with custom params", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + const watermark = (await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { + name: "custom", + opacity: 0.5, + padding: 0.1, + scale: 0.3, + position: "center", + }, + })) as Watermark; + + expect(watermark.opacity).toBe(0.5); + expect(watermark.padding).toBe(0.1); + expect(watermark.scale).toBe(0.3); + expect(watermark.position).toBe("center"); + }); + + test("create watermark from URL propagates fetch failures", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer(statusListener(404, "Missing")); + + await expect( + sendCmdToWorker(mf, "watermarks.generate.fromUrl", { + url: imageUrl.toString(), + params: { name: "missing" }, + }) + ).rejects.toThrow("Failed to fetch watermark from URL: 404 Missing"); + }); + + test("create watermark via File fails serialization across the binding", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "watermarks.generate.fromFile", { + params: { name: "unsupported" }, + }) + ).rejects.toThrow( + 'Could not serialize object of type "File". This type does not support serialization.' + ); + }); + + test("list watermarks", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { name: "wm1" }, + }); + await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { name: "wm2" }, + }); + + const list = (await sendCmdToWorker(mf, "watermarks.list")) as Watermark[]; + expect(list).toHaveLength(2); + const names = list.map((w) => w.name); + expect(names).toContain("wm1"); + expect(names).toContain("wm2"); + }); + + test("get watermark", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + const created = (await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { name: "get-test" }, + })) as Watermark; + const fetched = (await sendCmdToWorker(mf, "watermarks.get", { + id: created.id, + })) as Watermark; + + expect(fetched.id).toBe(created.id); + expect(fetched.name).toBe("get-test"); + }); + + test("get non-existent watermark throws", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "watermarks.get", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Watermark not found"); + }); + + test("delete watermark", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + const watermark = (await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { name: "delete-me" }, + })) as Watermark; + await sendCmdToWorker(mf, "watermarks.delete", { id: watermark.id }); + + const list = (await sendCmdToWorker(mf, "watermarks.list")) as Watermark[]; + expect(list).toHaveLength(0); + }); + + test("throws when deleting non existent watermark", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "watermarks.delete", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Watermark not found"); + }); + + test("opacity out of range throws", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + await expect( + sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { opacity: 2.0 }, + }) + ).rejects.toThrow("opacity must be between 0.0 and 1.0"); + }); + + test("padding out of range throws", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + await expect( + sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { padding: -0.1 }, + }) + ).rejects.toThrow("padding must be between 0.0 and 1.0"); + }); + + test("scale out of range throws", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + await expect( + sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { scale: 1.1 }, + }) + ).rejects.toThrow("scale must be between 0.0 and 1.0"); + }); + + test("boundary values 0.0 and 1.0 are accepted for range params", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + // 0.0 and 1.0 are valid boundary values — should not throw + const watermark = (await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { opacity: 0.0, padding: 1.0, scale: 0.0 }, + })) as Watermark; + + expect(watermark.id).toBeTruthy(); + expect(watermark.opacity).toBe(0.0); + expect(watermark.padding).toBe(1.0); + expect(watermark.scale).toBe(0.0); + }); + + test("watermark created with empty name stores empty string", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + const watermark = (await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: {}, + })) as Watermark; + + expect(watermark.name).toBe(""); + }); + + test("create watermark from ReadableStream stores correct size", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + // The "watermarks.generate" op fetches the URL and passes resp.body (ReadableStream) + const watermark = (await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { name: "stream-wm" }, + })) as Watermark; + + expect(watermark.size).toBe(TEST_IMAGE_BYTES.byteLength); + // downloadedFrom is null when created from a stream (not a URL) + expect(watermark.downloadedFrom).toBeNull(); + }); +}); + +describe("Stream downloads", () => { + test("generate default download", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const result = (await sendCmdToWorker(mf, "downloads.generate", { + id: video.id, + })) as DownloadGetResponse; + + expect(result.default).toBeDefined(); + expect(result.default?.status).toBe("ready"); + expect(result.default?.percentComplete).toBe(100); + }); + + test("generate audio download", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const result = (await sendCmdToWorker(mf, "downloads.generate", { + id: video.id, + type: "audio", + })) as DownloadGetResponse; + + expect(result.audio).toBeDefined(); + expect(result.audio?.status).toBe("ready"); + }); + + test("get downloads", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "downloads.generate", { + id: video.id, + type: "default", + }); + await sendCmdToWorker(mf, "downloads.generate", { + id: video.id, + type: "audio", + }); + + const result = (await sendCmdToWorker(mf, "downloads.get", { + id: video.id, + })) as DownloadGetResponse; + expect(result.default).toBeDefined(); + expect(result.audio).toBeDefined(); + }); + + test("get downloads when none exist returns empty object", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const result = (await sendCmdToWorker(mf, "downloads.get", { + id: video.id, + })) as DownloadGetResponse; + + expect(result.default).toBeUndefined(); + expect(result.audio).toBeUndefined(); + }); + + test("delete download", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "downloads.generate", { + id: video.id, + type: "default", + }); + await sendCmdToWorker(mf, "downloads.delete", { + id: video.id, + type: "default", + }); + + const result = (await sendCmdToWorker(mf, "downloads.get", { + id: video.id, + })) as DownloadGetResponse; + expect(result.default).toBeUndefined(); + }); + + test("throws when deleting non existent download", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + await expect( + sendCmdToWorker(mf, "downloads.delete", { id: video.id, type: "default" }) + ).rejects.toThrow("Download not found"); + }); + + test("throws on download of non existent video", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "downloads.generate", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Video not found"); + }); + + test("get downloads for non existent video throws", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "downloads.get", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Video not found"); + }); + + test("generate download is idempotent (upsert)", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + // Generate the same download twice + await sendCmdToWorker(mf, "downloads.generate", { + id: video.id, + type: "default", + }); + await sendCmdToWorker(mf, "downloads.generate", { + id: video.id, + type: "default", + }); + + // Should still only have one default download entry + const result = (await sendCmdToWorker(mf, "downloads.get", { + id: video.id, + })) as DownloadGetResponse; + expect(result.default).toBeDefined(); + expect(result.default?.status).toBe("ready"); + // audio should not be present + expect(result.audio).toBeUndefined(); + }); + + test("delete download throws for non-existent video", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "downloads.delete", { + id: "00000000-0000-0000-0000-000000000000", + type: "default", + }) + ).rejects.toThrow("Download not found"); + }); +}); + +describe("Stream unsupported binding operations", () => { + test("createDirectUpload is not supported", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect(sendCmdToWorker(mf, "createDirectUpload")).rejects.toThrow( + "createDirectUpload is not supported in local mode" + ); + }); +}); + +describe("Stream deletes clean up properly", () => { + test("deleting video cleans up captions and downloads", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const id = video.id; + + // Create associated data + await sendCmdToWorker(mf, "captions.generate", { id, language: "en" }); + await sendCmdToWorker(mf, "downloads.generate", { id, type: "default" }); + + // Delete the video + await sendCmdToWorker(mf, "video.delete", { id }); + + // Video is gone + await expect(sendCmdToWorker(mf, "video.details", { id })).rejects.toThrow( + "Video not found" + ); + + // Captions are gone (via FK cascade) + await expect(sendCmdToWorker(mf, "captions.list", { id })).rejects.toThrow( + "Video not found" + ); + + // Downloads are gone (via explicit delete + cascade) + await expect(sendCmdToWorker(mf, "downloads.get", { id })).rejects.toThrow( + "Video not found" + ); + }); + + test("deleting video cleans up its blob from storage", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "video.delete", { id: video.id }); + + // Upload a new video to ensure storage is still functional + const video2 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + expect(video2.id).not.toBe(video.id); + const details = (await sendCmdToWorker(mf, "video.details", { + id: video2.id, + })) as Video; + expect(details.size).toBe(TEST_VIDEO_BYTES.byteLength); + }); + + test("deleting one video does not affect another video's captions and downloads", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const videoA = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const videoB = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + // Add captions and downloads to both + await sendCmdToWorker(mf, "captions.generate", { + id: videoA.id, + language: "en", + }); + await sendCmdToWorker(mf, "captions.generate", { + id: videoB.id, + language: "en", + }); + await sendCmdToWorker(mf, "downloads.generate", { + id: videoA.id, + type: "default", + }); + await sendCmdToWorker(mf, "downloads.generate", { + id: videoB.id, + type: "default", + }); + + // Delete only video A + await sendCmdToWorker(mf, "video.delete", { id: videoA.id }); + + // videoB's captions and downloads should be unaffected + const captionsB = (await sendCmdToWorker(mf, "captions.list", { + id: videoB.id, + })) as Caption[]; + expect(captionsB).toHaveLength(1); + expect(captionsB[0].language).toBe("en"); + + const downloadsB = (await sendCmdToWorker(mf, "downloads.get", { + id: videoB.id, + })) as DownloadGetResponse; + expect(downloadsB.default).toBeDefined(); + }); +}); diff --git a/packages/wrangler/src/dev/miniflare/index.ts b/packages/wrangler/src/dev/miniflare/index.ts index 5710af8114..0223e79081 100644 --- a/packages/wrangler/src/dev/miniflare/index.ts +++ b/packages/wrangler/src/dev/miniflare/index.ts @@ -423,6 +423,7 @@ type WorkerOptionsBindings = Pick< | "additionalUnboundDurableObjects" | "media" | "versionMetadata" + | "stream" >; type MiniflareBindingsConfig = Pick< @@ -504,6 +505,7 @@ export function buildMiniflareBindingOptions( "version_metadata", bindings ); + const streamBindings = extractBindingsOfType("stream", bindings); const fetchers = extractBindingsOfType("fetcher", bindings); // Setup blob and module bindings @@ -784,6 +786,16 @@ export function buildMiniflareBindingOptions( : undefined, } : undefined, + stream: + streamBindings.length > 0 + ? { + binding: streamBindings[0].binding, + remoteProxyConnectionString: + streamBindings[0].remote && remoteProxyConnectionString + ? remoteProxyConnectionString + : undefined, + } + : undefined, vectorize: Object.fromEntries( vectorizeBindings.map((vectorize) => {