From 5183f9dabee92a6424df863439287946d4537f87 Mon Sep 17 00:00:00 2001 From: nwong Date: Mon, 23 Mar 2026 18:23:22 -0500 Subject: [PATCH 01/11] feat: add stream binding local mode --- .changeset/stream-binding-local-mode.md | 6 + packages/miniflare/src/index.ts | 12 +- packages/miniflare/src/plugins/index.ts | 5 + .../miniflare/src/plugins/stream/index.ts | 185 ++++ .../src/workers/shared/index.worker.ts | 2 +- .../src/workers/stream/binding.worker.ts | 260 ++++++ .../miniflare/src/workers/stream/errors.ts | 31 + .../src/workers/stream/object.worker.ts | 584 ++++++++++++ .../miniflare/src/workers/stream/schemas.ts | 206 +++++ .../test/plugins/stream/index.spec.ts | 862 ++++++++++++++++++ packages/wrangler/src/dev/miniflare/index.ts | 12 + 11 files changed, 2163 insertions(+), 2 deletions(-) create mode 100644 .changeset/stream-binding-local-mode.md create mode 100644 packages/miniflare/src/plugins/stream/index.ts create mode 100644 packages/miniflare/src/workers/stream/binding.worker.ts create mode 100644 packages/miniflare/src/workers/stream/errors.ts create mode 100644 packages/miniflare/src/workers/stream/object.worker.ts create mode 100644 packages/miniflare/src/workers/stream/schemas.ts create mode 100644 packages/miniflare/test/plugins/stream/index.spec.ts diff --git a/.changeset/stream-binding-local-mode.md b/.changeset/stream-binding-local-mode.md new file mode 100644 index 0000000000..6b50045c52 --- /dev/null +++ b/.changeset/stream-binding-local-mode.md @@ -0,0 +1,6 @@ +--- +"miniflare": minor +"wrangler": minor +--- + +Add local mode support for Stream bindings diff --git a/packages/miniflare/src/index.ts b/packages/miniflare/src/index.ts index 6eca63cd3c..bf5817ca3f 100644 --- a/packages/miniflare/src/index.ts +++ b/packages/miniflare/src/index.ts @@ -70,6 +70,7 @@ import { SharedOptions, SOCKET_ENTRY, SOCKET_ENTRY_LOCAL, + STREAM_PLUGIN_NAME, WorkerOptions, WrappedBindingNames, } from "./plugins"; @@ -150,6 +151,7 @@ import type { KVNamespaceListKey, Queue, R2Bucket, + StreamBinding, } from "@cloudflare/workers-types/experimental"; import type { Process } from "@puppeteer/browsers"; @@ -2167,7 +2169,9 @@ export class Miniflare { const urlSafeHost = getURLSafeHost(configuredHost); if (this.#sharedOpts.core.logRequests) { this.#log.logReady( - `${ready} on ${green(`${secure ? "https" : "http"}://${urlSafeHost}:${entryPort}`)}` + `${ready} on ${green( + `${secure ? "https" : "http"}://${urlSafeHost}:${entryPort}` + )}` ); } @@ -2763,6 +2767,12 @@ export class Miniflare { }> { return this.#getProxy(HELLO_WORLD_PLUGIN_NAME, bindingName, workerName); } + getStreamBinding( + bindingName: string, + workerName?: string + ): Promise { + return this.#getProxy(STREAM_PLUGIN_NAME, bindingName, workerName); + } /** @internal */ _getInternalDurableObjectNamespace( diff --git a/packages/miniflare/src/plugins/index.ts b/packages/miniflare/src/plugins/index.ts index 449f88d051..9cc332befa 100644 --- a/packages/miniflare/src/plugins/index.ts +++ b/packages/miniflare/src/plugins/index.ts @@ -31,6 +31,7 @@ import { QUEUES_PLUGIN, QUEUES_PLUGIN_NAME } from "./queues"; import { R2_PLUGIN, R2_PLUGIN_NAME } from "./r2"; import { RATELIMIT_PLUGIN, RATELIMIT_PLUGIN_NAME } from "./ratelimit"; import { SECRET_STORE_PLUGIN, SECRET_STORE_PLUGIN_NAME } from "./secret-store"; +import { STREAM_PLUGIN, STREAM_PLUGIN_NAME } from "./stream"; import { VECTORIZE_PLUGIN, VECTORIZE_PLUGIN_NAME } from "./vectorize"; import { VERSION_METADATA_PLUGIN, @@ -63,6 +64,7 @@ export const PLUGINS = { [BROWSER_RENDERING_PLUGIN_NAME]: BROWSER_RENDERING_PLUGIN, [DISPATCH_NAMESPACE_PLUGIN_NAME]: DISPATCH_NAMESPACE_PLUGIN, [IMAGES_PLUGIN_NAME]: IMAGES_PLUGIN, + [STREAM_PLUGIN_NAME]: STREAM_PLUGIN, [VECTORIZE_PLUGIN_NAME]: VECTORIZE_PLUGIN, [VPC_SERVICES_PLUGIN_NAME]: VPC_SERVICES_PLUGIN, [MTLS_PLUGIN_NAME]: MTLS_PLUGIN, @@ -127,6 +129,7 @@ export type WorkerOptions = z.input & z.input & z.input & z.input & + z.input & z.input & z.input & z.input & @@ -145,6 +148,7 @@ export type SharedOptions = z.input & z.input & z.input & z.input & + z.input & z.input; export const PLUGIN_ENTRIES = Object.entries(PLUGINS) as [ @@ -206,6 +210,7 @@ export * from "./ai"; export * from "./browser-rendering"; export * from "./dispatch-namespace"; export * from "./images"; +export * from "./stream"; export * from "./vectorize"; export * from "./vpc-services"; export * from "./mtls"; diff --git a/packages/miniflare/src/plugins/stream/index.ts b/packages/miniflare/src/plugins/stream/index.ts new file mode 100644 index 0000000000..ebc2dc7b63 --- /dev/null +++ b/packages/miniflare/src/plugins/stream/index.ts @@ -0,0 +1,185 @@ +import fs from "node:fs/promises"; +import BINDING_SCRIPT from "worker:stream/binding"; +import OBJECT_SCRIPT from "worker:stream/object"; +import { z } from "zod"; +import { SharedBindings } from "../../workers"; +import { + getMiniflareObjectBindings, + getPersistPath, + getUserBindingServiceName, + PersistenceSchema, + Plugin, + ProxyNodeBinding, + remoteProxyClientWorker, + RemoteProxyConnectionString, +} from "../shared"; +import type { Service } from "../../runtime"; + +const StreamSchema = z.object({ + binding: z.string(), + remoteProxyConnectionString: z + .custom() + .optional(), +}); + +export const StreamOptionsSchema = z.object({ + stream: StreamSchema.optional(), +}); + +export const StreamSharedOptionsSchema = z.object({ + streamPersist: PersistenceSchema, +}); + +export const STREAM_PLUGIN_NAME = "stream"; +const STREAM_STORAGE_SERVICE_NAME = `${STREAM_PLUGIN_NAME}:storage`; +const STREAM_OBJECT_SERVICE_NAME = `${STREAM_PLUGIN_NAME}:object`; +export const STREAM_OBJECT_CLASS_NAME = "StreamObject"; + +export const STREAM_COMPAT_DATE = "2026-03-23"; + +export const STREAM_PLUGIN: Plugin< + typeof StreamOptionsSchema, + typeof StreamSharedOptionsSchema +> = { + options: StreamOptionsSchema, + sharedOptions: StreamSharedOptionsSchema, + async getBindings(options) { + if (!options.stream) { + return []; + } + + return [ + { + name: options.stream.binding, + service: { + name: getUserBindingServiceName( + STREAM_PLUGIN_NAME, + options.stream.binding, + options.stream.remoteProxyConnectionString + ), + entrypoint: "StreamBinding", + }, + }, + ]; + }, + getNodeBindings(options: z.infer) { + if (!options.stream) { + return {}; + } + return { + [options.stream.binding]: new ProxyNodeBinding(), + }; + }, + async getServices({ + options, + sharedOptions, + tmpPath, + defaultPersistRoot, + unsafeStickyBlobs, + }) { + if (!options.stream) { + return []; + } + + const serviceName = getUserBindingServiceName( + STREAM_PLUGIN_NAME, + options.stream.binding, + options.stream.remoteProxyConnectionString + ); + + if (options.stream.remoteProxyConnectionString) { + return [ + { + name: serviceName, + worker: remoteProxyClientWorker( + options.stream.remoteProxyConnectionString, + options.stream.binding + ), + }, + ]; + } + + const persistPath = getPersistPath( + STREAM_PLUGIN_NAME, + tmpPath, + defaultPersistRoot, + sharedOptions.streamPersist + ); + await fs.mkdir(persistPath, { recursive: true }); + + // Disk storage for blobs and SQL + const storageService = { + name: STREAM_STORAGE_SERVICE_NAME, + disk: { path: persistPath, writable: true }, + } satisfies Service; + + // StreamObject + const objectService = { + name: STREAM_OBJECT_SERVICE_NAME, + worker: { + compatibilityDate: STREAM_COMPAT_DATE, + compatibilityFlags: ["nodejs_compat", "experimental"], + modules: [ + { + name: "object.worker.js", + esModule: OBJECT_SCRIPT(), + }, + ], + durableObjectNamespaces: [ + { + className: STREAM_OBJECT_CLASS_NAME, + uniqueKey: `miniflare-${STREAM_OBJECT_CLASS_NAME}`, + enableSql: true, + }, + ], + durableObjectStorage: { localDisk: STREAM_STORAGE_SERVICE_NAME }, + bindings: [ + { + name: SharedBindings.MAYBE_SERVICE_BLOBS, + service: { name: STREAM_STORAGE_SERVICE_NAME }, + }, + ...getMiniflareObjectBindings(unsafeStickyBlobs), + ], + // Allow the DO to send outbound HTTP requests (fetching watermark images) + globalOutbound: { name: "internet" }, + }, + } satisfies Service; + + // Entrypoint with RPC + const bindingService = { + name: serviceName, + worker: { + compatibilityDate: STREAM_COMPAT_DATE, + compatibilityFlags: ["nodejs_compat", "experimental"], + modules: [ + { + name: "binding.worker.js", + esModule: BINDING_SCRIPT(), + }, + ], + bindings: [ + { + name: "store", + durableObjectNamespace: { + className: STREAM_OBJECT_CLASS_NAME, + serviceName: STREAM_OBJECT_SERVICE_NAME, + }, + }, + ], + // Allow the binding worker to send outbound HTTP requests + // (e.g. fetching video from URL in upload fn) + globalOutbound: { name: "internet" }, + }, + } satisfies Service; + + return [storageService, objectService, bindingService]; + }, + getPersistPath({ streamPersist }, tmpPath) { + return getPersistPath( + STREAM_PLUGIN_NAME, + tmpPath, + undefined, + streamPersist + ); + }, +}; diff --git a/packages/miniflare/src/workers/shared/index.worker.ts b/packages/miniflare/src/workers/shared/index.worker.ts index 840ca7191b..e126a93ee2 100644 --- a/packages/miniflare/src/workers/shared/index.worker.ts +++ b/packages/miniflare/src/workers/shared/index.worker.ts @@ -45,7 +45,7 @@ export { } from "./router.worker"; export type { RouteHandler } from "./router.worker"; -export { get, all, drain } from "./sql.worker"; +export { get, all, drain, createTypedSql } from "./sql.worker"; export type { TypedValue, TypedResult, diff --git a/packages/miniflare/src/workers/stream/binding.worker.ts b/packages/miniflare/src/workers/stream/binding.worker.ts new file mode 100644 index 0000000000..07682b9ddd --- /dev/null +++ b/packages/miniflare/src/workers/stream/binding.worker.ts @@ -0,0 +1,260 @@ +import { RpcTarget, WorkerEntrypoint } from "cloudflare:workers"; +import { BadRequestError, InvalidURLError } from "./errors"; +import { + rowToStreamCaption, + rowToStreamDownload, + rowToStreamVideo, + rowToStreamWatermark, +} from "./schemas"; +import type { StreamObject } from "./object.worker"; + +interface Env { + store: DurableObjectNamespace; +} + +function getStub(env: Env): DurableObjectStub { + const id = env.store.idFromName("stream-data"); + return env.store.get(id); +} + +function rowsToDownloadResponse( + rows: { type: string; download: StreamDownload }[] +): StreamDownloadGetResponse { + const result: StreamDownloadGetResponse = {}; + for (const { type, download } of rows) { + if (type === "default") result.default = download; + else if (type === "audio") result.audio = download; + } + return result; +} + +export class StreamBinding extends WorkerEntrypoint { + async upload( + urlOrBody: string | ReadableStream, + params?: StreamUrlUploadParams + ): Promise { + let body: ReadableStream; + if (typeof urlOrBody === "string") { + const response = await fetch(urlOrBody); + if (!response.ok || response.body === null) { + throw new InvalidURLError( + `Failed to fetch video from URL: ${response.status} ${response.statusText}` + ); + } + body = response.body; + } else { + body = urlOrBody; + } + const stub = getStub(this.env); + const row = await stub.createVideo(body, params ?? {}); + return rowToStreamVideo(row); + } + + // Not supported in local mode yet + async createDirectUpload( + _params: StreamDirectUploadCreateParams + ): Promise { + throw new BadRequestError( + "createDirectUpload is not supported in local mode" + ); + } + + video(id: string): StreamVideoHandle { + return new StreamVideoHandleImpl(this.env, id); + } + + get videos(): StreamVideos { + return new StreamVideosImpl(this.env); + } + + get watermarks(): StreamWatermarks { + return new StreamWatermarksImpl(this.env); + } +} + +class StreamScopedCaptionsImpl + extends RpcTarget + implements StreamScopedCaptions +{ + readonly #env: Env; + readonly #videoId: string; + + constructor(env: Env, videoId: string) { + super(); + this.#env = env; + this.#videoId = videoId; + } + + async upload(_language: string, _file: File): Promise { + throw new BadRequestError( + "caption upload via File is not supported in local mode" + ); + } + + async generate(language: string): Promise { + const stub = getStub(this.#env); + const row = await stub.generateCaption(this.#videoId, language); + return rowToStreamCaption(row); + } + + async list(language?: string): Promise { + const stub = getStub(this.#env); + const rows = await stub.listCaptions(this.#videoId, language); + return rows.map(rowToStreamCaption); + } + + async delete(language: string): Promise { + const stub = getStub(this.#env); + await stub.deleteCaption(this.#videoId, language); + } +} + +class StreamScopedDownloadsImpl + extends RpcTarget + implements StreamScopedDownloads +{ + readonly #env: Env; + readonly #videoId: string; + + constructor(env: Env, videoId: string) { + super(); + this.#env = env; + this.#videoId = videoId; + } + + async generate( + downloadType: StreamDownloadType = "default" + ): Promise { + const stub = getStub(this.#env); + const rows = await stub.generateDownload(this.#videoId, downloadType); + return rowsToDownloadResponse(rows.map(rowToStreamDownload)); + } + + async get(): Promise { + const stub = getStub(this.#env); + const rows = await stub.listDownloads(this.#videoId); + return rowsToDownloadResponse(rows.map(rowToStreamDownload)); + } + + async delete(downloadType: StreamDownloadType = "default"): Promise { + const stub = getStub(this.#env); + await stub.deleteDownload(this.#videoId, downloadType); + } +} + +class StreamVideoHandleImpl extends RpcTarget implements StreamVideoHandle { + readonly id: string; + readonly #env: Env; + + constructor(env: Env, id: string) { + super(); + this.#env = env; + this.id = id; + } + + async details(): Promise { + const stub = getStub(this.#env); + const row = await stub.getVideo(this.id); + return rowToStreamVideo(row); + } + + async update(params: StreamUpdateVideoParams): Promise { + const stub = getStub(this.#env); + const row = await stub.updateVideo(this.id, params); + return rowToStreamVideo(row); + } + + async delete(): Promise { + const stub = getStub(this.#env); + await stub.deleteVideo(this.id); + } + + async generateToken(): Promise { + const stub = getStub(this.#env); + return stub.generateToken(this.id); + } + + get downloads(): StreamScopedDownloads { + return new StreamScopedDownloadsImpl(this.#env, this.id); + } + + get captions(): StreamScopedCaptions { + return new StreamScopedCaptionsImpl(this.#env, this.id); + } +} + +class StreamVideosImpl extends RpcTarget implements StreamVideos { + readonly #env: Env; + + constructor(env: Env) { + super(); + this.#env = env; + } + + async list(params?: StreamVideosListParams): Promise { + const stub = getStub(this.#env); + const rows = await stub.listVideos(params); + return rows.map(rowToStreamVideo); + } +} + +class StreamWatermarksImpl extends RpcTarget implements StreamWatermarks { + readonly #env: Env; + + constructor(env: Env) { + super(); + this.#env = env; + } + + async generate( + fileOrUrl: File | string | ReadableStream, + params: StreamWatermarkCreateParams + ): Promise { + if (fileOrUrl instanceof File) { + throw new BadRequestError( + "watermark generation via File is not supported in local mode" + ); + } + if ( + params.opacity !== undefined && + (params.opacity < 0 || params.opacity > 1) + ) { + throw new BadRequestError("opacity must be between 0.0 and 1.0"); + } + if ( + params.padding !== undefined && + (params.padding < 0 || params.padding > 1) + ) { + throw new BadRequestError("padding must be between 0.0 and 1.0"); + } + if (params.scale !== undefined && (params.scale < 0 || params.scale > 1)) { + throw new BadRequestError("scale must be between 0.0 and 1.0"); + } + const stub = getStub(this.#env); + if (typeof fileOrUrl === "string") { + const row = await stub.createWatermarkFromUrl(fileOrUrl, params); + return rowToStreamWatermark(row); + } + // ReadableStream — pre-fetched data passed directly + const buffer = await new Response(fileOrUrl).arrayBuffer(); + const row = await stub.createWatermarkFromBody(buffer, null, params); + return rowToStreamWatermark(row); + } + + async list(): Promise { + const stub = getStub(this.#env); + const rows = await stub.listWatermarks(); + return rows.map(rowToStreamWatermark); + } + + async get(watermarkId: string): Promise { + const stub = getStub(this.#env); + const row = await stub.getWatermark(watermarkId); + return rowToStreamWatermark(row); + } + + async delete(watermarkId: string): Promise { + const stub = getStub(this.#env); + await stub.deleteWatermark(watermarkId); + } +} diff --git a/packages/miniflare/src/workers/stream/errors.ts b/packages/miniflare/src/workers/stream/errors.ts new file mode 100644 index 0000000000..a633d7c4db --- /dev/null +++ b/packages/miniflare/src/workers/stream/errors.ts @@ -0,0 +1,31 @@ +export class StreamBindingError extends Error implements StreamError { + constructor( + message: string, + readonly code: number, + readonly statusCode: number + ) { + super(message); + this.name = "StreamBindingError"; + } +} + +export class BadRequestError extends StreamBindingError { + constructor(message = "Bad Request") { + super(message, 10005, 400); + this.name = "BadRequestError"; + } +} + +export class NotFoundError extends StreamBindingError { + constructor(message = "Not Found") { + super(message, 10003, 404); + this.name = "NotFoundError"; + } +} + +export class InvalidURLError extends StreamBindingError { + constructor(message = "Invalid URL") { + super(message, 10010, 400); + this.name = "InvalidURLError"; + } +} diff --git a/packages/miniflare/src/workers/stream/object.worker.ts b/packages/miniflare/src/workers/stream/object.worker.ts new file mode 100644 index 0000000000..68a15c1352 --- /dev/null +++ b/packages/miniflare/src/workers/stream/object.worker.ts @@ -0,0 +1,584 @@ +import { DurableObject } from "cloudflare:workers"; +import { all, BlobStore, createTypedSql, get } from "miniflare:shared"; +import { BadRequestError, InvalidURLError, NotFoundError } from "./errors"; +import { SQL_SCHEMA } from "./schemas"; +import type { + CaptionRow, + DownloadRow, + VideoRow, + WatermarkRow, +} from "./schemas"; +import type { BlobId, TypedSql } from "miniflare:shared"; + +const BLOB_NAMESPACE = "stream-data"; + +interface Env { + MINIFLARE_BLOBS?: Fetcher; + MINIFLARE_STICKY_BLOBS?: boolean; +} + +export class StreamObject extends DurableObject { + readonly #blob: BlobStore; + readonly #db: TypedSql; + readonly #stmts: ReturnType; + + constructor(state: DurableObjectState, env: Env) { + super(state, env); + const db = createTypedSql(state.storage); + db.exec("PRAGMA foreign_keys = ON"); + db.exec(SQL_SCHEMA); + this.#db = db; + this.#stmts = sqlStmts(db); + const stickyBlobs = !!env.MINIFLARE_STICKY_BLOBS; + this.#blob = new BlobStore( + env.MINIFLARE_BLOBS as Fetcher, + BLOB_NAMESPACE, + stickyBlobs + ); + } + + async createVideo( + body: ReadableStream | null, + params: StreamUrlUploadParams + ): Promise { + const id = crypto.randomUUID(); + const now = new Date().toISOString(); + + let blobId: BlobId | null = null; + let size = 0; + + if (body !== null) { + // Count bytes while streaming through to blob storage + const { readable, writable } = new TransformStream< + Uint8Array, + Uint8Array + >({ + transform(chunk, controller) { + size += chunk.byteLength; + controller.enqueue(chunk); + }, + }); + [blobId] = await Promise.all([ + this.#blob.put(readable), + body.pipeTo(writable), + ]); + } + + this.#stmts.insertVideo({ + id, + creator: params.creator ?? null, + meta: JSON.stringify(params.meta ?? {}), + allowed_origins: JSON.stringify(params.allowedOrigins ?? []), + require_signed_urls: params.requireSignedURLs ? 1 : 0, + scheduled_deletion: params.scheduledDeletion ?? null, + thumbnail_timestamp_pct: params.thumbnailTimestampPct ?? 0, + created: now, + modified: now, + uploaded: body !== null ? now : null, + status_state: body !== null ? "ready" : "pendingupload", + ready_to_stream: body !== null ? 1 : 0, + size, + blob_id: blobId, + }); + + const row = get(this.#stmts.getVideo({ id })); + if (row === undefined) throw new NotFoundError(`Video not found: ${id}`); + return row; + } + + async getVideo(id: string): Promise { + const row = get(this.#stmts.getVideo({ id })); + if (row === undefined) throw new NotFoundError(`Video not found: ${id}`); + return row; + } + + async updateVideo( + id: string, + params: StreamUpdateVideoParams + ): Promise { + return this.#stmts.updateVideo(id, params); + } + + async deleteVideo(id: string): Promise { + const blobIds = this.#stmts.deleteVideo(id); + await Promise.all(blobIds.map((b) => this.#blob.delete(b))); + } + + async listVideos(params?: StreamVideosListParams): Promise { + const db = this.#db; + const conditions: string[] = []; + const values: (string | number | null)[] = []; + + const compToSql = (comp: string) => { + const ops: Record = { + eq: "=", + gt: ">", + gte: ">=", + lt: "<", + lte: "<=", + }; + const op = ops[comp]; + if (op === undefined) { + throw new BadRequestError(`Invalid comparison operator: ${comp}`); + } + return op; + }; + + if (params?.before !== undefined) { + const op = compToSql(params.beforeComp ?? "lt"); + conditions.push(`created ${op} ?`); + values.push(params.before); + } + if (params?.after !== undefined) { + const op = compToSql(params.afterComp ?? "gte"); + conditions.push(`created ${op} ?`); + values.push(params.after); + } + + if (conditions.length === 0) { + // If not using any filters we can use the listVideos statement + if (params?.limit === undefined) { + return all(this.#stmts.listVideos({})); + } + return all(this.#stmts.listVideosLimit({ limit: params.limit })); + } + + const where = `WHERE ${conditions.join(" AND ")}`; + const limit = params?.limit ?? 1000; + // Not using a prepared statement here, easier to just exec + return Array.from( + db.exec( + `SELECT * FROM _mf_stream_videos ${where} ORDER BY created DESC LIMIT ?`, + ...values, + limit + ) + ); + } + + async generateToken(id: string): Promise { + const row = get(this.#stmts.getVideo({ id })); + if (row === undefined) throw new NotFoundError(`Video not found: ${id}`); + + const payload = { + sub: id, + kid: "local-mode-key", + exp: Math.floor(Date.now() / 1000) + 6 * 60 * 60, + }; + return btoa(JSON.stringify(payload)); + } + + async generateCaption( + videoId: string, + language: string + ): Promise { + const video = get(this.#stmts.getVideo({ id: videoId })); + if (video === undefined) + throw new NotFoundError(`Video not found: ${videoId}`); + + const label = + new Intl.DisplayNames(["en"], { type: "language" }).of(language) ?? + language; + + this.#stmts.upsertCaption({ + video_id: videoId, + language, + generated: 1, + label, + status: "ready", + }); + + const row = get(this.#stmts.getCaption({ video_id: videoId, language })); + if (row === undefined) + throw new NotFoundError(`Caption not found: ${videoId}/${language}`); + return row; + } + + async listCaptions( + videoId: string, + language?: string + ): Promise { + const video = get(this.#stmts.getVideo({ id: videoId })); + if (video === undefined) + throw new NotFoundError(`Video not found: ${videoId}`); + + if (language !== undefined) { + const row = get(this.#stmts.getCaption({ video_id: videoId, language })); + return row !== undefined ? [row] : []; + } + return all(this.#stmts.listCaptionsByVideo({ video_id: videoId })); + } + + async deleteCaption(videoId: string, language: string): Promise { + const deleted = get( + this.#stmts.deleteCaption({ video_id: videoId, language }) + ); + if (deleted === undefined) { + throw new NotFoundError(`Caption not found: ${videoId}/${language}`); + } + if (deleted.blob_id !== null) { + await this.#blob.delete(deleted.blob_id); + } + } + + async createWatermarkFromUrl( + url: string, + params: StreamWatermarkCreateParams + ): Promise { + const response = await fetch(url); + if (!response.ok || response.body === null) { + throw new InvalidURLError( + `Failed to fetch watermark from URL: ${response.status} ${response.statusText}` + ); + } + + return this.createWatermarkFromBody( + await response.arrayBuffer(), + url, + params + ); + } + + async createWatermarkFromBody( + buffer: ArrayBuffer, + downloadedFrom: string | null, + params: StreamWatermarkCreateParams + ): Promise { + const size = buffer.byteLength; + const blobId = await this.#blob.put( + new Response(buffer).body as ReadableStream + ); + + const id = crypto.randomUUID(); + const now = new Date().toISOString(); + + this.#stmts.insertWatermark({ + id, + name: params.name ?? "", + size, + created: now, + downloaded_from: downloadedFrom ?? "", + opacity: params.opacity ?? 1.0, + padding: params.padding ?? 0.05, + scale: params.scale ?? 0.15, + position: params.position ?? "upperRight", + blob_id: blobId, + }); + + const row = get(this.#stmts.getWatermark({ id })); + if (row === undefined) + throw new NotFoundError(`Watermark not found: ${id}`); + return row; + } + + async getWatermark(id: string): Promise { + const row = get(this.#stmts.getWatermark({ id })); + if (row === undefined) + throw new NotFoundError(`Watermark not found: ${id}`); + return row; + } + + async listWatermarks(): Promise { + return all(this.#stmts.listWatermarks({})); + } + + async deleteWatermark(id: string): Promise { + const deleted = get(this.#stmts.deleteWatermark({ id })); + if (deleted === undefined) + throw new NotFoundError(`Watermark not found: ${id}`); + if (deleted.blob_id !== null) { + await this.#blob.delete(deleted.blob_id); + } + } + + async generateDownload( + videoId: string, + downloadType: StreamDownloadType = "default" + ): Promise { + const video = get(this.#stmts.getVideo({ id: videoId })); + if (video === undefined) + throw new NotFoundError(`Video not found: ${videoId}`); + + this.#stmts.upsertDownload({ + video_id: videoId, + download_type: downloadType, + status: "ready", + percent_complete: 100.0, + }); + + return all(this.#stmts.listDownloads({ video_id: videoId })); + } + + async listDownloads(videoId: string): Promise { + const video = get(this.#stmts.getVideo({ id: videoId })); + if (video === undefined) + throw new NotFoundError(`Video not found: ${videoId}`); + return all(this.#stmts.listDownloads({ video_id: videoId })); + } + + async deleteDownload( + videoId: string, + downloadType: StreamDownloadType = "default" + ): Promise { + const deleted = get( + this.#stmts.deleteDownload({ + video_id: videoId, + download_type: downloadType, + }) + ); + if (deleted === undefined) { + throw new NotFoundError(`Download not found: ${videoId}/${downloadType}`); + } + } +} + +// Helper functions to return all db statements +function sqlStmts(db: TypedSql) { + // Videos + + const stmtGetVideo = db.stmt, VideoRow>( + "SELECT * FROM _mf_stream_videos WHERE id = :id" + ); + + const stmtInsertVideo = db.stmt< + Pick< + VideoRow, + | "id" + | "creator" + | "meta" + | "allowed_origins" + | "require_signed_urls" + | "scheduled_deletion" + | "thumbnail_timestamp_pct" + | "created" + | "modified" + | "uploaded" + | "status_state" + | "ready_to_stream" + | "size" + | "blob_id" + > + >(`INSERT INTO _mf_stream_videos ( + id, creator, meta, allowed_origins, require_signed_urls, + scheduled_deletion, thumbnail_timestamp_pct, created, modified, + uploaded, status_state, ready_to_stream, size, blob_id + ) VALUES ( + :id, :creator, :meta, :allowed_origins, :require_signed_urls, + :scheduled_deletion, :thumbnail_timestamp_pct, :created, :modified, + :uploaded, :status_state, :ready_to_stream, :size, :blob_id + )`); + + const stmtUpdateVideo = db.stmt< + Pick< + VideoRow, + | "id" + | "modified" + | "creator" + | "meta" + | "allowed_origins" + | "require_signed_urls" + | "scheduled_deletion" + | "thumbnail_timestamp_pct" + | "max_duration_seconds" + > + >(`UPDATE _mf_stream_videos SET + modified = :modified, + creator = :creator, + meta = :meta, + allowed_origins = :allowed_origins, + require_signed_urls = :require_signed_urls, + scheduled_deletion = :scheduled_deletion, + thumbnail_timestamp_pct = :thumbnail_timestamp_pct, + max_duration_seconds = :max_duration_seconds + WHERE id = :id`); + + const stmtGetVideoCaptionBlobs = db.stmt< + Pick, + Pick + >("SELECT blob_id FROM _mf_stream_captions WHERE video_id = :id"); + + const stmtDeleteVideoDownloads = db.stmt>( + "DELETE FROM _mf_stream_downloads WHERE video_id = :id" + ); + + const stmtDeleteVideo = db.stmt< + Pick, + Pick + >("DELETE FROM _mf_stream_videos WHERE id = :id RETURNING blob_id"); + + const stmtListVideos = db.stmt, VideoRow>( + "SELECT * FROM _mf_stream_videos ORDER BY created DESC" + ); + + const stmtListVideosLimit = db.stmt<{ limit: number }, VideoRow>( + "SELECT * FROM _mf_stream_videos ORDER BY created DESC LIMIT :limit" + ); + + // Captions + + const stmtGetCaption = db.stmt< + Pick, + CaptionRow + >( + "SELECT * FROM _mf_stream_captions WHERE video_id = :video_id AND language = :language" + ); + + const stmtUpsertCaption = db.stmt< + Pick + >(`INSERT INTO _mf_stream_captions (video_id, language, generated, label, status) + VALUES (:video_id, :language, :generated, :label, :status) + ON CONFLICT (video_id, language) DO UPDATE SET + generated = excluded.generated, + label = excluded.label, + status = excluded.status`); + + const stmtListCaptionsByVideo = db.stmt< + Pick, + CaptionRow + >("SELECT * FROM _mf_stream_captions WHERE video_id = :video_id"); + + const stmtDeleteCaption = db.stmt< + Pick, + Pick + >( + "DELETE FROM _mf_stream_captions WHERE video_id = :video_id AND language = :language RETURNING blob_id" + ); + + // Watermarks + + const stmtGetWatermark = db.stmt, WatermarkRow>( + "SELECT * FROM _mf_stream_watermarks WHERE id = :id" + ); + + const stmtInsertWatermark = db.stmt< + Pick< + WatermarkRow, + | "id" + | "name" + | "size" + | "created" + | "downloaded_from" + | "opacity" + | "padding" + | "scale" + | "position" + | "blob_id" + > + >(`INSERT INTO _mf_stream_watermarks + (id, name, size, height, width, created, downloaded_from, opacity, padding, scale, position, blob_id) + VALUES (:id, :name, :size, 0, 0, :created, :downloaded_from, :opacity, :padding, :scale, :position, :blob_id)`); + + const stmtListWatermarks = db.stmt, WatermarkRow>( + "SELECT * FROM _mf_stream_watermarks ORDER BY created DESC" + ); + + const stmtDeleteWatermark = db.stmt< + Pick, + Pick + >("DELETE FROM _mf_stream_watermarks WHERE id = :id RETURNING blob_id"); + + // Downloads + + const stmtUpsertDownload = db.stmt< + Pick< + DownloadRow, + "video_id" | "download_type" | "status" | "percent_complete" + > + >(`INSERT INTO _mf_stream_downloads (video_id, download_type, status, percent_complete) + VALUES (:video_id, :download_type, :status, :percent_complete) + ON CONFLICT (video_id, download_type) DO UPDATE SET + status = excluded.status, + percent_complete = excluded.percent_complete`); + + const stmtListDownloads = db.stmt, DownloadRow>( + "SELECT * FROM _mf_stream_downloads WHERE video_id = :video_id" + ); + + const stmtDeleteDownload = db.stmt< + Pick, + Pick + >( + "DELETE FROM _mf_stream_downloads WHERE video_id = :video_id AND download_type = :download_type RETURNING video_id" + ); + + // Operations using transactions + + const deleteVideo = db.txn((id: string): BlobId[] => { + // Collect caption blob_ids before CASCADE deletes rows + const captionBlobs = all(stmtGetVideoCaptionBlobs({ id })) + .map((r) => r.blob_id) + .filter((b): b is BlobId => b !== null); + + stmtDeleteVideoDownloads({ id }); + + const videoRow = get(stmtDeleteVideo({ id })); + if (videoRow === undefined) + throw new NotFoundError(`Video not found: ${id}`); + + const blobIds = captionBlobs; + if (videoRow.blob_id !== null) blobIds.push(videoRow.blob_id); + return blobIds; + }); + + const updateVideo = db.txn( + (id: string, params: StreamUpdateVideoParams): VideoRow => { + const current = get(stmtGetVideo({ id })); + if (current === undefined) + throw new NotFoundError(`Video not found: ${id}`); + + const now = new Date().toISOString(); + stmtUpdateVideo({ + id, + modified: now, + creator: + "creator" in params ? (params.creator ?? null) : current.creator, + meta: + params.meta !== undefined + ? JSON.stringify(params.meta) + : current.meta, + allowed_origins: + params.allowedOrigins !== undefined + ? JSON.stringify(params.allowedOrigins) + : current.allowed_origins, + require_signed_urls: + params.requireSignedURLs !== undefined + ? params.requireSignedURLs + ? 1 + : 0 + : current.require_signed_urls, + scheduled_deletion: + "scheduledDeletion" in params + ? (params.scheduledDeletion ?? null) + : current.scheduled_deletion, + thumbnail_timestamp_pct: + params.thumbnailTimestampPct ?? current.thumbnail_timestamp_pct, + max_duration_seconds: + params.maxDurationSeconds ?? current.max_duration_seconds, + }); + + const updated = get(stmtGetVideo({ id })); + if (updated === undefined) + throw new NotFoundError(`Video not found: ${id}`); + return updated; + } + ); + + return { + getVideo: stmtGetVideo, + insertVideo: stmtInsertVideo, + updateVideo, + deleteVideo, + listVideos: stmtListVideos, + listVideosLimit: stmtListVideosLimit, + getCaption: stmtGetCaption, + upsertCaption: stmtUpsertCaption, + listCaptionsByVideo: stmtListCaptionsByVideo, + deleteCaption: stmtDeleteCaption, + getWatermark: stmtGetWatermark, + insertWatermark: stmtInsertWatermark, + listWatermarks: stmtListWatermarks, + deleteWatermark: stmtDeleteWatermark, + upsertDownload: stmtUpsertDownload, + listDownloads: stmtListDownloads, + deleteDownload: stmtDeleteDownload, + }; +} diff --git a/packages/miniflare/src/workers/stream/schemas.ts b/packages/miniflare/src/workers/stream/schemas.ts new file mode 100644 index 0000000000..4fc69a855f --- /dev/null +++ b/packages/miniflare/src/workers/stream/schemas.ts @@ -0,0 +1,206 @@ +export const SQL_SCHEMA = ` +CREATE TABLE IF NOT EXISTS _mf_stream_videos ( + id TEXT PRIMARY KEY, + creator TEXT, + thumbnail TEXT NOT NULL DEFAULT '', + thumbnail_timestamp_pct REAL NOT NULL DEFAULT 0.0, + ready_to_stream INTEGER NOT NULL DEFAULT 1, + ready_to_stream_at TEXT, + status_state TEXT NOT NULL DEFAULT 'ready', + status_pct_complete TEXT, + status_error_reason_code TEXT NOT NULL DEFAULT '', + status_error_reason_text TEXT NOT NULL DEFAULT '', + meta TEXT NOT NULL DEFAULT '{}', + created TEXT NOT NULL, + modified TEXT NOT NULL, + scheduled_deletion TEXT, + size INTEGER NOT NULL DEFAULT 0, + allowed_origins TEXT NOT NULL DEFAULT '[]', + require_signed_urls INTEGER, + uploaded TEXT, + upload_expiry TEXT, + max_size_bytes INTEGER, + max_duration_seconds INTEGER, + duration REAL NOT NULL DEFAULT -1.0, + input_width INTEGER NOT NULL DEFAULT 0, + input_height INTEGER NOT NULL DEFAULT 0, + live_input_id TEXT, + clipped_from_id TEXT, + blob_id TEXT +); + +CREATE TABLE IF NOT EXISTS _mf_stream_captions ( + video_id TEXT NOT NULL, + language TEXT NOT NULL, + generated INTEGER NOT NULL DEFAULT 0, + label TEXT NOT NULL DEFAULT '', + status TEXT, + blob_id TEXT, + PRIMARY KEY (video_id, language), + FOREIGN KEY (video_id) REFERENCES _mf_stream_videos(id) ON DELETE CASCADE +); + +CREATE TABLE IF NOT EXISTS _mf_stream_watermarks ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL DEFAULT '', + size INTEGER NOT NULL DEFAULT 0, + height INTEGER NOT NULL DEFAULT 0, + width INTEGER NOT NULL DEFAULT 0, + created TEXT NOT NULL, + downloaded_from TEXT, + opacity REAL NOT NULL DEFAULT 1.0, + padding REAL NOT NULL DEFAULT 0.05, + scale REAL NOT NULL DEFAULT 0.15, + position TEXT NOT NULL DEFAULT 'upperRight', + blob_id TEXT +); + +CREATE TABLE IF NOT EXISTS _mf_stream_downloads ( + video_id TEXT NOT NULL, + download_type TEXT NOT NULL DEFAULT 'default', + status TEXT NOT NULL DEFAULT 'ready', + percent_complete REAL NOT NULL DEFAULT 100.0, + url TEXT, + PRIMARY KEY (video_id, download_type), + FOREIGN KEY (video_id) REFERENCES _mf_stream_videos(id) ON DELETE CASCADE +); +`; + +export type VideoRow = { + id: string; + creator: string | null; + thumbnail: string; + thumbnail_timestamp_pct: number; + ready_to_stream: number; + ready_to_stream_at: string | null; + status_state: string; + status_pct_complete: string | null; + status_error_reason_code: string; + status_error_reason_text: string; + meta: string; + created: string; + modified: string; + scheduled_deletion: string | null; + size: number; + allowed_origins: string; + require_signed_urls: number | null; + uploaded: string | null; + upload_expiry: string | null; + max_size_bytes: number | null; + max_duration_seconds: number | null; + duration: number; + input_width: number; + input_height: number; + live_input_id: string | null; + clipped_from_id: string | null; + blob_id: string | null; +}; + +export type CaptionRow = { + video_id: string; + language: string; + generated: number; + label: string; + status: string | null; + blob_id: string | null; +}; + +export type WatermarkRow = { + id: string; + name: string; + size: number; + height: number; + width: number; + created: string; + downloaded_from: string | null; + opacity: number; + padding: number; + scale: number; + position: string; + blob_id: string | null; +}; + +export type DownloadRow = { + video_id: string; + download_type: string; + status: string; + percent_complete: number; + url: string | null; +}; + +export function rowToStreamVideo(row: VideoRow): StreamVideo { + const baseUrl = `https://customer-placeholder.cloudflarestream.com/${row.id}`; + return { + id: row.id, + creator: row.creator, + thumbnail: row.thumbnail || `${baseUrl}/thumbnails/thumbnail.jpg`, + thumbnailTimestampPct: row.thumbnail_timestamp_pct, + readyToStream: row.ready_to_stream === 1, + readyToStreamAt: row.ready_to_stream_at, + status: { + state: row.status_state, + pctComplete: row.status_pct_complete ?? undefined, + errorReasonCode: row.status_error_reason_code, + errorReasonText: row.status_error_reason_text, + }, + meta: JSON.parse(row.meta) as Record, + created: row.created, + modified: row.modified, + scheduledDeletion: row.scheduled_deletion, + size: row.size, + preview: `${baseUrl}/watch`, + allowedOrigins: JSON.parse(row.allowed_origins) as string[], + requireSignedURLs: row.require_signed_urls === 1 ? true : null, + uploaded: row.uploaded, + uploadExpiry: row.upload_expiry, + maxSizeBytes: row.max_size_bytes, + maxDurationSeconds: row.max_duration_seconds, + duration: row.duration, + input: { width: row.input_width, height: row.input_height }, + hlsPlaybackUrl: `${baseUrl}/manifest/video.m3u8`, + dashPlaybackUrl: `${baseUrl}/manifest/video.mpd`, + watermark: null, + liveInputId: row.live_input_id, + clippedFromId: row.clipped_from_id, + publicDetails: null, + }; +} + +export function rowToStreamCaption(row: CaptionRow): StreamCaption { + return { + language: row.language, + label: row.label || row.language, + generated: row.generated === 1, + status: row.status as StreamCaption["status"], + }; +} + +export function rowToStreamWatermark(row: WatermarkRow): StreamWatermark { + return { + id: row.id, + name: row.name, + size: row.size, + height: row.height, + width: row.width, + created: row.created, + downloadedFrom: row.downloaded_from, + opacity: row.opacity, + padding: row.padding, + scale: row.scale, + position: row.position as StreamWatermarkPosition, + }; +} + +export function rowToStreamDownload(row: DownloadRow): { + type: string; + download: StreamDownload; +} { + return { + type: row.download_type, + download: { + percentComplete: row.percent_complete, + status: row.status as StreamDownloadStatus, + url: row.url ?? undefined, + }, + }; +} diff --git a/packages/miniflare/test/plugins/stream/index.spec.ts b/packages/miniflare/test/plugins/stream/index.spec.ts new file mode 100644 index 0000000000..0505dab085 --- /dev/null +++ b/packages/miniflare/test/plugins/stream/index.spec.ts @@ -0,0 +1,862 @@ +import http from "node:http"; +import { Miniflare, STREAM_COMPAT_DATE } from "miniflare"; +import { describe, test } from "vitest"; +import { useDispose, useServer } from "../../test-shared"; +import type { + StreamCaption as Caption, + StreamDownloadGetResponse as DownloadGetResponse, + StreamVideo as Video, + StreamWatermark as Watermark, +} from "@cloudflare/workers-types"; + +// Mock image / video bytes +const TEST_VIDEO_BYTES = new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7]); +const TEST_IMAGE_BYTES = new Uint8Array([255, 216, 255, 224]); + +function staticBytesListener(bytes: Uint8Array): http.RequestListener { + return (_req, res) => { + res.writeHead(200, { "Content-Type": "application/octet-stream" }); + res.end(Buffer.from(bytes)); + }; +} + +const WORKER_SCRIPT = ` +export default { + async fetch(request, env) { + try { + const { op, args } = await request.json(); + const stream = env.STREAM; + const result = await handleCommand(stream, op, args || {}); + return Response.json({ ok: true, result }); + } catch (err) { + return Response.json({ ok: false, error: err.message }, { status: 200 }); + } + } +} + +async function handleCommand(stream, op, args) { + switch (op) { + case "upload": { + const resp = await fetch(args.url); + return stream.upload(resp.body, args.params); + } + case "video.details": + return stream.video(args.id).details(); + case "video.update": + return stream.video(args.id).update(args.params); + case "video.delete": + await stream.video(args.id).delete(); + return null; + case "video.generateToken": + return stream.video(args.id).generateToken(); + case "videos.list": + return stream.videos.list(args.params); + case "captions.generate": + return stream.video(args.id).captions.generate(args.language); + case "captions.list": + return stream.video(args.id).captions.list(args.language); + case "captions.delete": + await stream.video(args.id).captions.delete(args.language); + return null; + case "captions.upload": { + const file = new File(["test"], "captions.vtt"); + return stream.video(args.id).captions.upload(args.language, file); + } + case "downloads.generate": + return stream.video(args.id).downloads.generate(args.type); + case "downloads.get": + return stream.video(args.id).downloads.get(); + case "downloads.delete": + await stream.video(args.id).downloads.delete(args.type); + return null; + case "watermarks.generate": { + const resp = await fetch(args.url); + return stream.watermarks.generate(resp.body, args.params || {}); + } + case "watermarks.list": + return stream.watermarks.list(); + case "watermarks.get": + return stream.watermarks.get(args.id); + case "watermarks.delete": + await stream.watermarks.delete(args.id); + return null; + case "createDirectUpload": + return stream.createDirectUpload(args.params || {}); + default: + throw new Error("Unknown op: " + op); + } +} +`; + +function createMiniflare(): Miniflare { + return new Miniflare({ + compatibilityDate: STREAM_COMPAT_DATE, + stream: { binding: "STREAM" }, + streamPersist: false, + modules: true, + script: WORKER_SCRIPT, + }); +} + +async function sendCmdToWorker( + mf: Miniflare, + op: string, + args: Record = {} +): Promise { + const resp = await mf.dispatchFetch("http://placeholder", { + method: "POST", + body: JSON.stringify({ op, args }), + headers: { "Content-Type": "application/json" }, + }); + const data = (await resp.json()) as { + ok: boolean; + result: unknown; + error?: string; + }; + if (!data.ok) { + throw new Error(data.error); + } + return data.result; +} + +describe("Stream videos", () => { + test("upload and retrieve details", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + expect(video.id).toBeTruthy(); + expect(video.readyToStream).toBe(true); + expect(video.status.state).toBe("ready"); + expect(video.size).toBe(TEST_VIDEO_BYTES.byteLength); + expect(video.created).toBeTruthy(); + expect(video.modified).toBeTruthy(); + expect(video.hlsPlaybackUrl).toContain(video.id); + expect(video.dashPlaybackUrl).toContain(video.id); + + const details = (await sendCmdToWorker(mf, "video.details", { + id: video.id, + })) as Video; + expect(details.id).toBe(video.id); + expect(details.readyToStream).toBe(true); + }); + + test("upload with params", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + params: { + creator: "test-creator", + meta: { title: "Test Video" }, + requireSignedURLs: true, + thumbnailTimestampPct: 0.5, + }, + })) as Video; + + expect(video.creator).toBe("test-creator"); + expect(video.meta).toEqual({ title: "Test Video" }); + expect(video.requireSignedURLs).toBe(true); + expect(video.thumbnailTimestampPct).toBe(0.5); + }); + + test("throw when getting details for non existent video", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "video.details", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Video not found"); + }); + + test("update video metadata", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const originalModified = video.modified; + + const updated = (await sendCmdToWorker(mf, "video.update", { + id: video.id, + params: { + creator: "new-creator", + meta: { description: "Updated" }, + }, + })) as Video; + + expect(updated.id).toBe(video.id); + expect(updated.creator).toBe("new-creator"); + expect(updated.meta).toEqual({ description: "Updated" }); + expect(updated.modified).not.toBe(originalModified); + }); + + test("delete video", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "video.delete", { id: video.id }); + + await expect( + sendCmdToWorker(mf, "video.details", { id: video.id }) + ).rejects.toThrow("Video not found"); + }); + + test("throws when deleting non existent video", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "video.delete", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Video not found"); + }); + + test("generate token", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const token = (await sendCmdToWorker(mf, "video.generateToken", { + id: video.id, + })) as string; + + expect(typeof token).toBe("string"); + // Token is b64 encoded JSON + const payload = JSON.parse(atob(token)) as { + sub: string; + kid: string; + exp: number; + }; + expect(payload.sub).toBe(video.id); + expect(payload.kid).toBe("local-mode-key"); + expect(payload.exp).toBeGreaterThan(Math.floor(Date.now() / 1000)); + }); + + test("generate token for non-existent video throws", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "video.generateToken", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Video not found"); + }); +}); + +describe("Stream videos list", () => { + test("list empty", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + const videos = (await sendCmdToWorker(mf, "videos.list")) as Video[]; + expect(videos).toEqual([]); + }); + + test("list all videos", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); + + const videos = (await sendCmdToWorker(mf, "videos.list")) as Video[]; + expect(videos).toHaveLength(3); + for (const v of videos) { + expect(v.id).toBeTruthy(); + } + }); + + test("list with limit", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + for (let i = 0; i < 5; i++) { + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); + } + + const limited = (await sendCmdToWorker(mf, "videos.list", { + params: { limit: 2 }, + })) as Video[]; + expect(limited).toHaveLength(2); + }); + + test("list ordered by created descending", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const v1 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await new Promise((r) => setTimeout(r, 5)); + const v2 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await new Promise((r) => setTimeout(r, 5)); + const v3 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + const videos = (await sendCmdToWorker(mf, "videos.list")) as Video[]; + // Newest first + expect(videos[0].id).toBe(v3.id); + expect(videos[1].id).toBe(v2.id); + expect(videos[2].id).toBe(v1.id); + }); +}); + +describe("Stream captions", () => { + test("generate caption", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const caption = (await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "en", + })) as Caption; + + expect(caption.language).toBe("en"); + expect(caption.generated).toBe(true); + expect(caption.status).toBe("ready"); + expect(caption.label).toBeTruthy(); + }); + + test("list captions", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "en", + }); + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "fr", + }); + + const captions = (await sendCmdToWorker(mf, "captions.list", { + id: video.id, + })) as Caption[]; + expect(captions).toHaveLength(2); + const languages = captions.map((c) => c.language); + expect(languages).toContain("en"); + expect(languages).toContain("fr"); + }); + + test("list captions filtered by language", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "en", + }); + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "fr", + }); + + const enOnly = (await sendCmdToWorker(mf, "captions.list", { + id: video.id, + language: "en", + })) as Caption[]; + expect(enOnly).toHaveLength(1); + expect(enOnly[0].language).toBe("en"); + }); + + test("list captions empty language filter", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "en", + }); + + const deOnly = (await sendCmdToWorker(mf, "captions.list", { + id: video.id, + language: "de", + })) as Caption[]; + expect(deOnly).toHaveLength(0); + }); + + test("delete caption", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "en", + }); + await sendCmdToWorker(mf, "captions.delete", { + id: video.id, + language: "en", + }); + + const remaining = (await sendCmdToWorker(mf, "captions.list", { + id: video.id, + })) as Caption[]; + expect(remaining).toHaveLength(0); + }); + + test("throws when deleting non existent caption", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + await expect( + sendCmdToWorker(mf, "captions.delete", { id: video.id, language: "zh" }) + ).rejects.toThrow("Caption not found"); + }); + + test("throws when getting caption for non existent video", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "captions.generate", { + id: "00000000-0000-0000-0000-000000000000", + language: "en", + }) + ).rejects.toThrow("Video not found"); + }); +}); + +describe("Stream watermarks", () => { + test("create watermark from URL", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + const watermark = (await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { name: "test-watermark" }, + })) as Watermark; + + expect(watermark.id).toBeTruthy(); + expect(watermark.name).toBe("test-watermark"); + expect(watermark.size).toBe(TEST_IMAGE_BYTES.byteLength); + expect(watermark.created).toBeTruthy(); + + expect(watermark.opacity).toBe(1.0); + expect(watermark.padding).toBe(0.05); + expect(watermark.scale).toBe(0.15); + expect(watermark.position).toBe("upperRight"); + }); + + test("create watermark with custom params", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + const watermark = (await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { + name: "custom", + opacity: 0.5, + padding: 0.1, + scale: 0.3, + position: "center", + }, + })) as Watermark; + + expect(watermark.opacity).toBe(0.5); + expect(watermark.padding).toBe(0.1); + expect(watermark.scale).toBe(0.3); + expect(watermark.position).toBe("center"); + }); + + test("list watermarks", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { name: "wm1" }, + }); + await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { name: "wm2" }, + }); + + const list = (await sendCmdToWorker(mf, "watermarks.list")) as Watermark[]; + expect(list).toHaveLength(2); + const names = list.map((w) => w.name); + expect(names).toContain("wm1"); + expect(names).toContain("wm2"); + }); + + test("get watermark", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + const created = (await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { name: "get-test" }, + })) as Watermark; + const fetched = (await sendCmdToWorker(mf, "watermarks.get", { + id: created.id, + })) as Watermark; + + expect(fetched.id).toBe(created.id); + expect(fetched.name).toBe("get-test"); + }); + + test("get non-existent watermark throws", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "watermarks.get", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Watermark not found"); + }); + + test("delete watermark", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + const watermark = (await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { name: "delete-me" }, + })) as Watermark; + await sendCmdToWorker(mf, "watermarks.delete", { id: watermark.id }); + + const list = (await sendCmdToWorker(mf, "watermarks.list")) as Watermark[]; + expect(list).toHaveLength(0); + }); + + test("throws when deleting non existent watermark", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "watermarks.delete", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Watermark not found"); + }); + + test("opacity out of range throws", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + await expect( + sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { opacity: 2.0 }, + }) + ).rejects.toThrow("opacity must be between 0.0 and 1.0"); + }); + + test("padding out of range throws", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + await expect( + sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { padding: -0.1 }, + }) + ).rejects.toThrow("padding must be between 0.0 and 1.0"); + }); +}); + +describe("Stream downloads", () => { + test("generate default download", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const result = (await sendCmdToWorker(mf, "downloads.generate", { + id: video.id, + })) as DownloadGetResponse; + + expect(result.default).toBeDefined(); + expect(result.default!.status).toBe("ready"); + expect(result.default!.percentComplete).toBe(100); + }); + + test("generate audio download", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const result = (await sendCmdToWorker(mf, "downloads.generate", { + id: video.id, + type: "audio", + })) as DownloadGetResponse; + + expect(result.audio).toBeDefined(); + expect(result.audio!.status).toBe("ready"); + }); + + test("get downloads", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "downloads.generate", { + id: video.id, + type: "default", + }); + await sendCmdToWorker(mf, "downloads.generate", { + id: video.id, + type: "audio", + }); + + const result = (await sendCmdToWorker(mf, "downloads.get", { + id: video.id, + })) as DownloadGetResponse; + expect(result.default).toBeDefined(); + expect(result.audio).toBeDefined(); + }); + + test("get downloads when none exist returns empty object", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const result = (await sendCmdToWorker(mf, "downloads.get", { + id: video.id, + })) as DownloadGetResponse; + + expect(result.default).toBeUndefined(); + expect(result.audio).toBeUndefined(); + }); + + test("delete download", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "downloads.generate", { + id: video.id, + type: "default", + }); + await sendCmdToWorker(mf, "downloads.delete", { + id: video.id, + type: "default", + }); + + const result = (await sendCmdToWorker(mf, "downloads.get", { + id: video.id, + })) as DownloadGetResponse; + expect(result.default).toBeUndefined(); + }); + + test("throws when deleting non existent download", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + await expect( + sendCmdToWorker(mf, "downloads.delete", { id: video.id, type: "default" }) + ).rejects.toThrow("Download not found"); + }); + + test("throws on download of non existent video", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "downloads.generate", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Video not found"); + }); +}); + +describe("Stream deletes clean up properly", () => { + test("deleting video cleans up captions and downloads", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const id = video.id; + + // Create associated data + await sendCmdToWorker(mf, "captions.generate", { id, language: "en" }); + await sendCmdToWorker(mf, "downloads.generate", { id, type: "default" }); + + // Delete the video + await sendCmdToWorker(mf, "video.delete", { id }); + + // Video is gone + await expect(sendCmdToWorker(mf, "video.details", { id })).rejects.toThrow( + "Video not found" + ); + + // Captions are gone (via FK cascade) + await expect(sendCmdToWorker(mf, "captions.list", { id })).rejects.toThrow( + "Video not found" + ); + + // Downloads are gone (via explicit delete + cascade) + await expect(sendCmdToWorker(mf, "downloads.get", { id })).rejects.toThrow( + "Video not found" + ); + }); + + test("deleting video cleans up its blob from storage", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await sendCmdToWorker(mf, "video.delete", { id: video.id }); + + // Upload a new video to ensure storage is still functional + const video2 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + expect(video2.id).not.toBe(video.id); + const details = (await sendCmdToWorker(mf, "video.details", { + id: video2.id, + })) as Video; + expect(details.size).toBe(TEST_VIDEO_BYTES.byteLength); + }); +}); diff --git a/packages/wrangler/src/dev/miniflare/index.ts b/packages/wrangler/src/dev/miniflare/index.ts index 5710af8114..0223e79081 100644 --- a/packages/wrangler/src/dev/miniflare/index.ts +++ b/packages/wrangler/src/dev/miniflare/index.ts @@ -423,6 +423,7 @@ type WorkerOptionsBindings = Pick< | "additionalUnboundDurableObjects" | "media" | "versionMetadata" + | "stream" >; type MiniflareBindingsConfig = Pick< @@ -504,6 +505,7 @@ export function buildMiniflareBindingOptions( "version_metadata", bindings ); + const streamBindings = extractBindingsOfType("stream", bindings); const fetchers = extractBindingsOfType("fetcher", bindings); // Setup blob and module bindings @@ -784,6 +786,16 @@ export function buildMiniflareBindingOptions( : undefined, } : undefined, + stream: + streamBindings.length > 0 + ? { + binding: streamBindings[0].binding, + remoteProxyConnectionString: + streamBindings[0].remote && remoteProxyConnectionString + ? remoteProxyConnectionString + : undefined, + } + : undefined, vectorize: Object.fromEntries( vectorizeBindings.map((vectorize) => { From 1eb2f9f2900007274e6bf0807c5b0061cb282359 Mon Sep 17 00:00:00 2001 From: nwong Date: Tue, 24 Mar 2026 09:54:51 -0500 Subject: [PATCH 02/11] fix: test undefined assertions --- packages/miniflare/test/plugins/stream/index.spec.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/miniflare/test/plugins/stream/index.spec.ts b/packages/miniflare/test/plugins/stream/index.spec.ts index 0505dab085..35b93c135e 100644 --- a/packages/miniflare/test/plugins/stream/index.spec.ts +++ b/packages/miniflare/test/plugins/stream/index.spec.ts @@ -677,8 +677,8 @@ describe("Stream downloads", () => { })) as DownloadGetResponse; expect(result.default).toBeDefined(); - expect(result.default!.status).toBe("ready"); - expect(result.default!.percentComplete).toBe(100); + expect(result.default?.status).toBe("ready"); + expect(result.default?.percentComplete).toBe(100); }); test("generate audio download", async ({ expect }) => { @@ -697,7 +697,7 @@ describe("Stream downloads", () => { })) as DownloadGetResponse; expect(result.audio).toBeDefined(); - expect(result.audio!.status).toBe("ready"); + expect(result.audio?.status).toBe("ready"); }); test("get downloads", async ({ expect }) => { From 1d81d81f5aba396cacc4479cb0146daca196cdfa Mon Sep 17 00:00:00 2001 From: nwong Date: Tue, 24 Mar 2026 09:55:34 -0500 Subject: [PATCH 03/11] fix: clean up videos pagination logic --- .../src/workers/stream/object.worker.ts | 76 +++++++++---------- 1 file changed, 36 insertions(+), 40 deletions(-) diff --git a/packages/miniflare/src/workers/stream/object.worker.ts b/packages/miniflare/src/workers/stream/object.worker.ts index 68a15c1352..960eb4b6e4 100644 --- a/packages/miniflare/src/workers/stream/object.worker.ts +++ b/packages/miniflare/src/workers/stream/object.worker.ts @@ -105,54 +105,50 @@ export class StreamObject extends DurableObject { } async listVideos(params?: StreamVideosListParams): Promise { - const db = this.#db; - const conditions: string[] = []; - const values: (string | number | null)[] = []; - - const compToSql = (comp: string) => { - const ops: Record = { - eq: "=", - gt: ">", - gte: ">=", - lt: "<", - lte: "<=", - }; - const op = ops[comp]; - if (op === undefined) { - throw new BadRequestError(`Invalid comparison operator: ${comp}`); + if (!params?.before && !params?.after) { + if (params?.limit === undefined) { + return all(this.#stmts.listVideos({})); } - return op; + return all(this.#stmts.listVideosLimit({ limit: params.limit })); + } + + const compToSql: Record = { + eq: "=", + gt: ">", + gte: ">=", + lt: "<", + lte: "<=", }; + const conditions: string[] = []; + const values: (string | number)[] = []; - if (params?.before !== undefined) { - const op = compToSql(params.beforeComp ?? "lt"); - conditions.push(`created ${op} ?`); + if (params.before !== undefined) { + const op = compToSql[params.beforeComp ?? "lt"]; + if (op === undefined) { + throw new BadRequestError( + "Invalid comparison operator: " + String(params.beforeComp) + ); + } + conditions.push("created " + op + " ?"); values.push(params.before); } - if (params?.after !== undefined) { - const op = compToSql(params.afterComp ?? "gte"); - conditions.push(`created ${op} ?`); - values.push(params.after); - } - - if (conditions.length === 0) { - // If not using any filters we can use the listVideos statement - if (params?.limit === undefined) { - return all(this.#stmts.listVideos({})); + if (params.after !== undefined) { + const op = compToSql[params.afterComp ?? "gte"]; + if (op === undefined) { + throw new BadRequestError( + "Invalid comparison operator: " + String(params.afterComp) + ); } - return all(this.#stmts.listVideosLimit({ limit: params.limit })); + conditions.push("created " + op + " ?"); + values.push(params.after); } - const where = `WHERE ${conditions.join(" AND ")}`; - const limit = params?.limit ?? 1000; - // Not using a prepared statement here, easier to just exec - return Array.from( - db.exec( - `SELECT * FROM _mf_stream_videos ${where} ORDER BY created DESC LIMIT ?`, - ...values, - limit - ) - ); + values.push(params.limit ?? 1000); + const sql = + "SELECT * FROM _mf_stream_videos WHERE " + + conditions.join(" AND ") + + " ORDER BY created DESC LIMIT ?"; + return Array.from(this.#db.exec(sql, ...values)); } async generateToken(id: string): Promise { From 2fb08df33f256345855df98328b60d105c75a31a Mon Sep 17 00:00:00 2001 From: nwong Date: Wed, 25 Mar 2026 09:58:58 -0500 Subject: [PATCH 04/11] fix: narrow download type to StreamDownloadType --- packages/miniflare/src/workers/stream/binding.worker.ts | 5 ++--- packages/miniflare/src/workers/stream/object.worker.ts | 2 +- packages/miniflare/src/workers/stream/schemas.ts | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/packages/miniflare/src/workers/stream/binding.worker.ts b/packages/miniflare/src/workers/stream/binding.worker.ts index 07682b9ddd..27bd299230 100644 --- a/packages/miniflare/src/workers/stream/binding.worker.ts +++ b/packages/miniflare/src/workers/stream/binding.worker.ts @@ -18,12 +18,11 @@ function getStub(env: Env): DurableObjectStub { } function rowsToDownloadResponse( - rows: { type: string; download: StreamDownload }[] + rows: { type: StreamDownloadType; download: StreamDownload }[] ): StreamDownloadGetResponse { const result: StreamDownloadGetResponse = {}; for (const { type, download } of rows) { - if (type === "default") result.default = download; - else if (type === "audio") result.audio = download; + result[type] = download; } return result; } diff --git a/packages/miniflare/src/workers/stream/object.worker.ts b/packages/miniflare/src/workers/stream/object.worker.ts index 960eb4b6e4..6d3755b337 100644 --- a/packages/miniflare/src/workers/stream/object.worker.ts +++ b/packages/miniflare/src/workers/stream/object.worker.ts @@ -105,7 +105,7 @@ export class StreamObject extends DurableObject { } async listVideos(params?: StreamVideosListParams): Promise { - if (!params?.before && !params?.after) { + if (params?.before === undefined && params?.after === undefined) { if (params?.limit === undefined) { return all(this.#stmts.listVideos({})); } diff --git a/packages/miniflare/src/workers/stream/schemas.ts b/packages/miniflare/src/workers/stream/schemas.ts index 4fc69a855f..a234e3e8c1 100644 --- a/packages/miniflare/src/workers/stream/schemas.ts +++ b/packages/miniflare/src/workers/stream/schemas.ts @@ -122,7 +122,7 @@ export type WatermarkRow = { export type DownloadRow = { video_id: string; - download_type: string; + download_type: StreamDownloadType; status: string; percent_complete: number; url: string | null; @@ -192,7 +192,7 @@ export function rowToStreamWatermark(row: WatermarkRow): StreamWatermark { } export function rowToStreamDownload(row: DownloadRow): { - type: string; + type: StreamDownloadType; download: StreamDownload; } { return { From daebf5e33118545a04ea668f9a76d7ef07226217 Mon Sep 17 00:00:00 2001 From: nwong Date: Wed, 25 Mar 2026 10:24:55 -0500 Subject: [PATCH 05/11] fix: update requireSignedURLS collapsing --- packages/miniflare/src/workers/stream/schemas.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/miniflare/src/workers/stream/schemas.ts b/packages/miniflare/src/workers/stream/schemas.ts index a234e3e8c1..d5f016e38c 100644 --- a/packages/miniflare/src/workers/stream/schemas.ts +++ b/packages/miniflare/src/workers/stream/schemas.ts @@ -150,7 +150,8 @@ export function rowToStreamVideo(row: VideoRow): StreamVideo { size: row.size, preview: `${baseUrl}/watch`, allowedOrigins: JSON.parse(row.allowed_origins) as string[], - requireSignedURLs: row.require_signed_urls === 1 ? true : null, + requireSignedURLs: + row.require_signed_urls === null ? null : row.require_signed_urls === 1, uploaded: row.uploaded, uploadExpiry: row.upload_expiry, maxSizeBytes: row.max_size_bytes, From 78c47db8fbead6948690a17715d0ce6b50bfe85d Mon Sep 17 00:00:00 2001 From: nwong Date: Wed, 25 Mar 2026 10:48:12 -0500 Subject: [PATCH 06/11] fix: update changeset description --- .changeset/stream-binding-local-mode.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/.changeset/stream-binding-local-mode.md b/.changeset/stream-binding-local-mode.md index 6b50045c52..450962f127 100644 --- a/.changeset/stream-binding-local-mode.md +++ b/.changeset/stream-binding-local-mode.md @@ -4,3 +4,20 @@ --- Add local mode support for Stream bindings + +Miniflare and `wrangler dev` now support using [Cloudflare Stream](https://developers.cloudflare.com/stream/) bindings locally. + +Supported operations: +- `upload()` — upload video via URL +- `video(id).details()`, `.update()`, `.delete()`, `.generateToken()` +- `videos.list()` +- `captions.generate()`, `.list()`, `.delete()` +- `downloads.generate()`, `.get()`, `.delete()` +- `watermarks.generate()`, `.list()`, `.get()`, `.delete()` + +The following are not yet supported in local mode and will throw: +- `createDirectUpload()` +- Caption upload via `File` +- Watermark generation via `File` + +Data is persisted across restarts by default. You must set `streamPersist: false` in Miniflare options to disable persistence. From 43fcb4719c6144e8d35650162d955ca31cec6008 Mon Sep 17 00:00:00 2001 From: nwong Date: Wed, 25 Mar 2026 11:37:54 -0500 Subject: [PATCH 07/11] fix: add timers and ops --- .../src/workers/stream/object.worker.ts | 56 ++++++++++++++++--- 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/packages/miniflare/src/workers/stream/object.worker.ts b/packages/miniflare/src/workers/stream/object.worker.ts index 6d3755b337..14d87a2bd3 100644 --- a/packages/miniflare/src/workers/stream/object.worker.ts +++ b/packages/miniflare/src/workers/stream/object.worker.ts @@ -1,5 +1,12 @@ import { DurableObject } from "cloudflare:workers"; -import { all, BlobStore, createTypedSql, get } from "miniflare:shared"; +import { + all, + BlobStore, + createTypedSql, + get, + SharedBindings, + Timers, +} from "miniflare:shared"; import { BadRequestError, InvalidURLError, NotFoundError } from "./errors"; import { SQL_SCHEMA } from "./schemas"; import type { @@ -15,20 +22,26 @@ const BLOB_NAMESPACE = "stream-data"; interface Env { MINIFLARE_BLOBS?: Fetcher; MINIFLARE_STICKY_BLOBS?: boolean; + [SharedBindings.MAYBE_JSON_ENABLE_CONTROL_ENDPOINTS]?: boolean; } export class StreamObject extends DurableObject { + readonly timers = new Timers(); readonly #blob: BlobStore; readonly #db: TypedSql; readonly #stmts: ReturnType; + #now() { + return new Date(this.timers.now()).toISOString(); + } + constructor(state: DurableObjectState, env: Env) { super(state, env); const db = createTypedSql(state.storage); db.exec("PRAGMA foreign_keys = ON"); db.exec(SQL_SCHEMA); this.#db = db; - this.#stmts = sqlStmts(db); + this.#stmts = sqlStmts(db, () => this.#now()); const stickyBlobs = !!env.MINIFLARE_STICKY_BLOBS; this.#blob = new BlobStore( env.MINIFLARE_BLOBS as Fetcher, @@ -42,7 +55,7 @@ export class StreamObject extends DurableObject { params: StreamUrlUploadParams ): Promise { const id = crypto.randomUUID(); - const now = new Date().toISOString(); + const now = this.#now(); let blobId: BlobId | null = null; let size = 0; @@ -158,7 +171,7 @@ export class StreamObject extends DurableObject { const payload = { sub: id, kid: "local-mode-key", - exp: Math.floor(Date.now() / 1000) + 6 * 60 * 60, + exp: Math.floor(this.timers.now() / 1000) + 6 * 60 * 60, }; return btoa(JSON.stringify(payload)); } @@ -245,7 +258,7 @@ export class StreamObject extends DurableObject { ); const id = crypto.randomUUID(); - const now = new Date().toISOString(); + const now = this.#now(); this.#stmts.insertWatermark({ id, @@ -325,10 +338,37 @@ export class StreamObject extends DurableObject { throw new NotFoundError(`Download not found: ${videoId}/${downloadType}`); } } + + async fetch( + req: Request + ) { + if (this.env[SharedBindings.MAYBE_JSON_ENABLE_CONTROL_ENDPOINTS] === true) { + const controlOp = req.cf?.miniflare?.controlOp; + if (controlOp !== undefined) { + const args = controlOp.args ?? []; + switch (controlOp.name) { + case "enableFakeTimers": + await this.timers.enableFakeTimers(args[0] as number); + return Response.json(null); + case "disableFakeTimers": + await this.timers.disableFakeTimers(); + return Response.json(null); + case "advanceFakeTime": + await this.timers.advanceFakeTime(args[0] as number); + return Response.json(null); + case "waitForFakeTasks": + await this.timers.waitForFakeTasks(); + return Response.json(null); + } + } + } + + return new Response(null, { status: 404 }); + } } // Helper functions to return all db statements -function sqlStmts(db: TypedSql) { +function sqlStmts(db: TypedSql, now: () => string) { // Videos const stmtGetVideo = db.stmt, VideoRow>( @@ -521,10 +561,10 @@ function sqlStmts(db: TypedSql) { if (current === undefined) throw new NotFoundError(`Video not found: ${id}`); - const now = new Date().toISOString(); + const nowValue = now(); stmtUpdateVideo({ id, - modified: now, + modified: nowValue, creator: "creator" in params ? (params.creator ?? null) : current.creator, meta: From f3ea61f0f27a83770fecd284fc6234c22c140760 Mon Sep 17 00:00:00 2001 From: nwong Date: Wed, 25 Mar 2026 11:38:42 -0500 Subject: [PATCH 08/11] fix: add more tests --- .../test/plugins/stream/index.spec.ts | 656 +++++++++++++++++- 1 file changed, 648 insertions(+), 8 deletions(-) diff --git a/packages/miniflare/test/plugins/stream/index.spec.ts b/packages/miniflare/test/plugins/stream/index.spec.ts index 35b93c135e..acc1ab0383 100644 --- a/packages/miniflare/test/plugins/stream/index.spec.ts +++ b/packages/miniflare/test/plugins/stream/index.spec.ts @@ -1,7 +1,19 @@ import http from "node:http"; -import { Miniflare, STREAM_COMPAT_DATE } from "miniflare"; +import { pathToFileURL } from "node:url"; +import { + Miniflare, + STREAM_COMPAT_DATE, + STREAM_OBJECT_CLASS_NAME, + STREAM_PLUGIN_NAME, +} from "miniflare"; +import type { MiniflareOptions } from "miniflare"; import { describe, test } from "vitest"; -import { useDispose, useServer } from "../../test-shared"; +import { + MiniflareDurableObjectControlStub, + useDispose, + useServer, + useTmp, +} from "../../test-shared"; import type { StreamCaption as Caption, StreamDownloadGetResponse as DownloadGetResponse, @@ -12,6 +24,8 @@ import type { // Mock image / video bytes const TEST_VIDEO_BYTES = new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7]); const TEST_IMAGE_BYTES = new Uint8Array([255, 216, 255, 224]); +const STREAM_OBJECT_NAME = "stream-data"; +const FAKE_TIME_START = 1_000_000; function staticBytesListener(bytes: Uint8Array): http.RequestListener { return (_req, res) => { @@ -20,6 +34,16 @@ function staticBytesListener(bytes: Uint8Array): http.RequestListener { }; } +function statusListener( + statusCode: number, + statusMessage?: string +): http.RequestListener { + return (_req, res) => { + res.writeHead(statusCode, statusMessage); + res.end(); + }; +} + const WORKER_SCRIPT = ` export default { async fetch(request, env) { @@ -29,7 +53,13 @@ export default { const result = await handleCommand(stream, op, args || {}); return Response.json({ ok: true, result }); } catch (err) { - return Response.json({ ok: false, error: err.message }, { status: 200 }); + return Response.json({ + ok: false, + error: err.message, + name: err.name, + code: err.code, + statusCode: err.statusCode, + }, { status: 200 }); } } } @@ -40,6 +70,8 @@ async function handleCommand(stream, op, args) { const resp = await fetch(args.url); return stream.upload(resp.body, args.params); } + case "upload.fromUrl": + return stream.upload(args.url, args.params); case "video.details": return stream.video(args.id).details(); case "video.update": @@ -73,6 +105,12 @@ async function handleCommand(stream, op, args) { const resp = await fetch(args.url); return stream.watermarks.generate(resp.body, args.params || {}); } + case "watermarks.generate.fromUrl": + return stream.watermarks.generate(args.url, args.params || {}); + case "watermarks.generate.fromFile": { + const file = new File(["test"], "watermark.png"); + return stream.watermarks.generate(file, args.params || {}); + } case "watermarks.list": return stream.watermarks.list(); case "watermarks.get": @@ -88,16 +126,32 @@ async function handleCommand(stream, op, args) { } `; -function createMiniflare(): Miniflare { +function createMiniflare(options: Partial = {}): Miniflare { return new Miniflare({ compatibilityDate: STREAM_COMPAT_DATE, stream: { binding: "STREAM" }, streamPersist: false, modules: true, script: WORKER_SCRIPT, - }); + ...options, + } as MiniflareOptions); } +async function getStreamObjectControl( + mf: Miniflare +): Promise { + const objectNamespace = await mf._getInternalDurableObjectNamespace( + STREAM_PLUGIN_NAME, + "stream:object", + STREAM_OBJECT_CLASS_NAME + ); + const objectId = objectNamespace.idFromName(STREAM_OBJECT_NAME); + const objectStub = objectNamespace.get(objectId); + return new MiniflareDurableObjectControlStub(objectStub); +} + +type CmdError = Error & { name: string; code?: number; statusCode?: number }; + async function sendCmdToWorker( mf: Miniflare, op: string, @@ -112,9 +166,16 @@ async function sendCmdToWorker( ok: boolean; result: unknown; error?: string; + name?: string; + code?: number; + statusCode?: number; }; if (!data.ok) { - throw new Error(data.error); + const err: CmdError = new Error(data.error); + err.name = data.name ?? "Error"; + err.code = data.code; + err.statusCode = data.statusCode; + throw err; } return data.result; } @@ -170,6 +231,18 @@ describe("Stream videos", () => { expect(video.thumbnailTimestampPct).toBe(0.5); }); + test("upload from URL propagates fetch failures", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer(statusListener(500, "Boom")); + + await expect( + sendCmdToWorker(mf, "upload.fromUrl", { + url: videoUrl.toString(), + }) + ).rejects.toThrow("Failed to fetch video from URL: 500 Boom"); + }); + test("throw when getting details for non existent video", async ({ expect, }) => { @@ -209,6 +282,18 @@ describe("Stream videos", () => { expect(updated.modified).not.toBe(originalModified); }); + test("throws when updating non existent video", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "video.update", { + id: "00000000-0000-0000-0000-000000000000", + params: { creator: "nobody" }, + }) + ).rejects.toThrow("Video not found"); + }); + test("delete video", async ({ expect }) => { const mf = createMiniflare(); useDispose(mf); @@ -237,6 +322,64 @@ describe("Stream videos", () => { ).rejects.toThrow("Video not found"); }); + test("partial update preserves untouched fields", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + params: { + creator: "original-creator", + meta: { title: "Original" }, + requireSignedURLs: true, + thumbnailTimestampPct: 0.3, + }, + })) as Video; + + // Only update creator — all other fields should be preserved + const updated = (await sendCmdToWorker(mf, "video.update", { + id: video.id, + params: { creator: "new-creator" }, + })) as Video; + + expect(updated.creator).toBe("new-creator"); + expect(updated.meta).toEqual({ title: "Original" }); + expect(updated.requireSignedURLs).toBe(true); + expect(updated.thumbnailTimestampPct).toBe(0.3); + }); + + test("update can null-clear creator and scheduledDeletion", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + params: { + creator: "will-be-cleared", + scheduledDeletion: new Date(Date.now() + 86_400_000).toISOString(), + }, + })) as Video; + + expect(video.creator).toBe("will-be-cleared"); + expect(video.scheduledDeletion).toBeTruthy(); + + const updated = (await sendCmdToWorker(mf, "video.update", { + id: video.id, + params: { creator: null, scheduledDeletion: null }, + })) as Video; + + expect(updated.creator).toBeNull(); + expect(updated.scheduledDeletion).toBeNull(); + }); + test("generate token", async ({ expect }) => { const mf = createMiniflare(); useDispose(mf); @@ -319,21 +462,47 @@ describe("Stream videos list", () => { expect(limited).toHaveLength(2); }); + test("list rejects invalid before comparison operator", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "videos.list", { + params: { before: new Date().toISOString(), beforeComp: "wat" }, + }) + ).rejects.toThrow("Invalid comparison operator: wat"); + }); + + test("list rejects invalid after comparison operator", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "videos.list", { + params: { after: new Date().toISOString(), afterComp: "wat" }, + }) + ).rejects.toThrow("Invalid comparison operator: wat"); + }); + test("list ordered by created descending", async ({ expect }) => { const mf = createMiniflare(); useDispose(mf); const { http: videoUrl } = await useServer( staticBytesListener(TEST_VIDEO_BYTES) ); + const object = await getStreamObjectControl(mf); + await object.enableFakeTimers(FAKE_TIME_START); const v1 = (await sendCmdToWorker(mf, "upload", { url: videoUrl.toString(), })) as Video; - await new Promise((r) => setTimeout(r, 5)); + await object.advanceFakeTime(5); const v2 = (await sendCmdToWorker(mf, "upload", { url: videoUrl.toString(), })) as Video; - await new Promise((r) => setTimeout(r, 5)); + await object.advanceFakeTime(5); const v3 = (await sendCmdToWorker(mf, "upload", { url: videoUrl.toString(), })) as Video; @@ -344,6 +513,192 @@ describe("Stream videos list", () => { expect(videos[1].id).toBe(v2.id); expect(videos[2].id).toBe(v1.id); }); + + test("list filters by before date (default lt operator)", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + const object = await getStreamObjectControl(mf); + await object.enableFakeTimers(FAKE_TIME_START); + + const v1 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + await object.advanceFakeTime(10_000); + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); + + // Filter to only videos created before a cutoff that excludes v2 + const cutoff = new Date(FAKE_TIME_START + 5_000).toISOString(); + const filtered = (await sendCmdToWorker(mf, "videos.list", { + params: { before: cutoff }, + })) as Video[]; + + expect(filtered).toHaveLength(1); + expect(filtered[0].id).toBe(v1.id); + }); + + test("list filters by after date (default gte operator)", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + const object = await getStreamObjectControl(mf); + await object.enableFakeTimers(FAKE_TIME_START); + + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); + await object.advanceFakeTime(10_000); + const v2 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + // Filter to only videos created at or after a cutoff that excludes v1 + const cutoff = new Date(FAKE_TIME_START + 5_000).toISOString(); + const filtered = (await sendCmdToWorker(mf, "videos.list", { + params: { after: cutoff }, + })) as Video[]; + + expect(filtered).toHaveLength(1); + expect(filtered[0].id).toBe(v2.id); + }); + + test("list filters with combined before and after bounds", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + const object = await getStreamObjectControl(mf); + await object.enableFakeTimers(FAKE_TIME_START); + + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); // t=0 + await object.advanceFakeTime(10_000); + const v2 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; // t=10s + await object.advanceFakeTime(10_000); + await sendCmdToWorker(mf, "upload", { url: videoUrl.toString() }); // t=20s + + const after = new Date(FAKE_TIME_START + 5_000).toISOString(); + const before = new Date(FAKE_TIME_START + 15_000).toISOString(); + const filtered = (await sendCmdToWorker(mf, "videos.list", { + params: { after, before }, + })) as Video[]; + + expect(filtered).toHaveLength(1); + expect(filtered[0].id).toBe(v2.id); + }); + + test("list with non-default comparison operators (gt, lte)", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + const object = await getStreamObjectControl(mf); + await object.enableFakeTimers(FAKE_TIME_START); + + const v1 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; // t=0 + await object.advanceFakeTime(10_000); + const v2 = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; // t=10s + + // afterComp=gt with v1's exact created time should NOT include v1 + const afterGtResult = (await sendCmdToWorker(mf, "videos.list", { + params: { after: v1.created, afterComp: "gt" }, + })) as Video[]; + expect(afterGtResult.map((v) => v.id)).not.toContain(v1.id); + expect(afterGtResult.map((v) => v.id)).toContain(v2.id); + + // beforeComp=lte with v1's exact created time should include v1 + const beforeLteResult = (await sendCmdToWorker(mf, "videos.list", { + params: { before: v1.created, beforeComp: "lte" }, + })) as Video[]; + expect(beforeLteResult.map((v) => v.id)).toContain(v1.id); + expect(beforeLteResult.map((v) => v.id)).not.toContain(v2.id); + }); +}); + +describe("Stream reloads", () => { + test("keeps in-memory data across setOptions reloads", async ({ expect }) => { + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + const opts = { + compatibilityDate: STREAM_COMPAT_DATE, + stream: { binding: "STREAM" }, + streamPersist: false, + modules: true, + script: WORKER_SCRIPT, + } satisfies MiniflareOptions; + const mf = new Miniflare(opts); + useDispose(mf); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + await mf.setOptions({ + ...opts, + script: `${WORKER_SCRIPT}\n// reload stream worker`, + }); + + const details = (await sendCmdToWorker(mf, "video.details", { + id: video.id, + })) as Video; + expect(details.id).toBe(video.id); + + const videos = (await sendCmdToWorker(mf, "videos.list")) as Video[]; + expect(videos).toHaveLength(1); + expect(videos[0].id).toBe(video.id); + }); + + test("keeps persisted data when persistence path format changes on reload", async ({ + expect, + }) => { + const tmp = await useTmp(); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + const opts = { + compatibilityDate: STREAM_COMPAT_DATE, + stream: { binding: "STREAM" }, + streamPersist: tmp, + modules: true, + script: WORKER_SCRIPT, + } satisfies MiniflareOptions; + const mf = new Miniflare(opts); + useDispose(mf); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + await mf.setOptions({ + ...opts, + streamPersist: pathToFileURL(tmp).href, + script: `${WORKER_SCRIPT}\n// reload persisted stream worker`, + }); + + const details = (await sendCmdToWorker(mf, "video.details", { + id: video.id, + })) as Video; + expect(details.id).toBe(video.id); + expect(details.size).toBe(TEST_VIDEO_BYTES.byteLength); + }); }); describe("Stream captions", () => { @@ -499,6 +854,73 @@ describe("Stream captions", () => { }) ).rejects.toThrow("Video not found"); }); + + test("caption upload via File fails serialization across the binding", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "captions.upload", { + id: "00000000-0000-0000-0000-000000000000", + language: "en", + }) + ).rejects.toThrow( + 'Could not serialize object of type "File". This type does not support serialization.' + ); + }); + + test("generate caption is idempotent (upsert)", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + // Generate the same caption twice + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "en", + }); + await sendCmdToWorker(mf, "captions.generate", { + id: video.id, + language: "en", + }); + + // Should still only have one caption, not two + const captions = (await sendCmdToWorker(mf, "captions.list", { + id: video.id, + })) as Caption[]; + expect(captions.filter((c) => c.language === "en")).toHaveLength(1); + }); + + test("list captions throws for non-existent video", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "captions.list", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Video not found"); + }); + + test("delete caption throws for non-existent video", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "captions.delete", { + id: "00000000-0000-0000-0000-000000000000", + language: "en", + }) + ).rejects.toThrow("Caption not found"); + }); }); describe("Stream watermarks", () => { @@ -549,6 +971,36 @@ describe("Stream watermarks", () => { expect(watermark.position).toBe("center"); }); + test("create watermark from URL propagates fetch failures", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer(statusListener(404, "Missing")); + + await expect( + sendCmdToWorker(mf, "watermarks.generate.fromUrl", { + url: imageUrl.toString(), + params: { name: "missing" }, + }) + ).rejects.toThrow("Failed to fetch watermark from URL: 404 Missing"); + }); + + test("create watermark via File fails serialization across the binding", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "watermarks.generate.fromFile", { + params: { name: "unsupported" }, + }) + ).rejects.toThrow( + 'Could not serialize object of type "File". This type does not support serialization.' + ); + }); + test("list watermarks", async ({ expect }) => { const mf = createMiniflare(); useDispose(mf); @@ -659,6 +1111,79 @@ describe("Stream watermarks", () => { }) ).rejects.toThrow("padding must be between 0.0 and 1.0"); }); + + test("scale out of range throws", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + await expect( + sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { scale: 1.1 }, + }) + ).rejects.toThrow("scale must be between 0.0 and 1.0"); + }); + + test("boundary values 0.0 and 1.0 are accepted for range params", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + // 0.0 and 1.0 are valid boundary values — should not throw + const watermark = (await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { opacity: 0.0, padding: 1.0, scale: 0.0 }, + })) as Watermark; + + expect(watermark.id).toBeTruthy(); + expect(watermark.opacity).toBe(0.0); + expect(watermark.padding).toBe(1.0); + expect(watermark.scale).toBe(0.0); + }); + + test("watermark created with empty name stores empty string", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + const watermark = (await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: {}, + })) as Watermark; + + expect(watermark.name).toBe(""); + }); + + test("create watermark from ReadableStream stores correct size", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: imageUrl } = await useServer( + staticBytesListener(TEST_IMAGE_BYTES) + ); + + // The "watermarks.generate" op fetches the URL and passes resp.body (ReadableStream) + const watermark = (await sendCmdToWorker(mf, "watermarks.generate", { + url: imageUrl.toString(), + params: { name: "stream-wm" }, + })) as Watermark; + + expect(watermark.size).toBe(TEST_IMAGE_BYTES.byteLength); + // downloadedFrom is empty string when created from a stream (not a URL) + expect(watermark.downloadedFrom).toBe(""); + }); }); describe("Stream downloads", () => { @@ -797,6 +1322,71 @@ describe("Stream downloads", () => { }) ).rejects.toThrow("Video not found"); }); + + test("get downloads for non existent video throws", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "downloads.get", { + id: "00000000-0000-0000-0000-000000000000", + }) + ).rejects.toThrow("Video not found"); + }); + + test("generate download is idempotent (upsert)", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const video = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + // Generate the same download twice + await sendCmdToWorker(mf, "downloads.generate", { + id: video.id, + type: "default", + }); + await sendCmdToWorker(mf, "downloads.generate", { + id: video.id, + type: "default", + }); + + // Should still only have one default download entry + const result = (await sendCmdToWorker(mf, "downloads.get", { + id: video.id, + })) as DownloadGetResponse; + expect(result.default).toBeDefined(); + expect(result.default?.status).toBe("ready"); + // audio should not be present + expect(result.audio).toBeUndefined(); + }); + + test("delete download throws for non-existent video", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect( + sendCmdToWorker(mf, "downloads.delete", { + id: "00000000-0000-0000-0000-000000000000", + type: "default", + }) + ).rejects.toThrow("Download not found"); + }); +}); + +describe("Stream unsupported binding operations", () => { + test("createDirectUpload is not supported", async ({ expect }) => { + const mf = createMiniflare(); + useDispose(mf); + + await expect(sendCmdToWorker(mf, "createDirectUpload")).rejects.toThrow( + "createDirectUpload is not supported in local mode" + ); + }); }); describe("Stream deletes clean up properly", () => { @@ -859,4 +1449,54 @@ describe("Stream deletes clean up properly", () => { })) as Video; expect(details.size).toBe(TEST_VIDEO_BYTES.byteLength); }); + + test("deleting one video does not affect another video's captions and downloads", async ({ + expect, + }) => { + const mf = createMiniflare(); + useDispose(mf); + const { http: videoUrl } = await useServer( + staticBytesListener(TEST_VIDEO_BYTES) + ); + + const videoA = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + const videoB = (await sendCmdToWorker(mf, "upload", { + url: videoUrl.toString(), + })) as Video; + + // Add captions and downloads to both + await sendCmdToWorker(mf, "captions.generate", { + id: videoA.id, + language: "en", + }); + await sendCmdToWorker(mf, "captions.generate", { + id: videoB.id, + language: "en", + }); + await sendCmdToWorker(mf, "downloads.generate", { + id: videoA.id, + type: "default", + }); + await sendCmdToWorker(mf, "downloads.generate", { + id: videoB.id, + type: "default", + }); + + // Delete only video A + await sendCmdToWorker(mf, "video.delete", { id: videoA.id }); + + // videoB's captions and downloads should be unaffected + const captionsB = (await sendCmdToWorker(mf, "captions.list", { + id: videoB.id, + })) as Caption[]; + expect(captionsB).toHaveLength(1); + expect(captionsB[0].language).toBe("en"); + + const downloadsB = (await sendCmdToWorker(mf, "downloads.get", { + id: videoB.id, + })) as DownloadGetResponse; + expect(downloadsB.default).toBeDefined(); + }); }); From c56448e7995ba6e26bad5c3b42ccae8e2bb7655f Mon Sep 17 00:00:00 2001 From: nwong Date: Wed, 25 Mar 2026 11:41:51 -0500 Subject: [PATCH 09/11] chore: run prettify --- .changeset/stream-binding-local-mode.md | 2 ++ packages/miniflare/src/workers/stream/object.worker.ts | 5 ++++- packages/miniflare/test/plugins/stream/index.spec.ts | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/.changeset/stream-binding-local-mode.md b/.changeset/stream-binding-local-mode.md index 450962f127..1f800be9f4 100644 --- a/.changeset/stream-binding-local-mode.md +++ b/.changeset/stream-binding-local-mode.md @@ -8,6 +8,7 @@ Add local mode support for Stream bindings Miniflare and `wrangler dev` now support using [Cloudflare Stream](https://developers.cloudflare.com/stream/) bindings locally. Supported operations: + - `upload()` — upload video via URL - `video(id).details()`, `.update()`, `.delete()`, `.generateToken()` - `videos.list()` @@ -16,6 +17,7 @@ Supported operations: - `watermarks.generate()`, `.list()`, `.get()`, `.delete()` The following are not yet supported in local mode and will throw: + - `createDirectUpload()` - Caption upload via `File` - Watermark generation via `File` diff --git a/packages/miniflare/src/workers/stream/object.worker.ts b/packages/miniflare/src/workers/stream/object.worker.ts index 14d87a2bd3..ff7f4b371e 100644 --- a/packages/miniflare/src/workers/stream/object.worker.ts +++ b/packages/miniflare/src/workers/stream/object.worker.ts @@ -340,7 +340,10 @@ export class StreamObject extends DurableObject { } async fetch( - req: Request + req: Request< + unknown, + { miniflare?: { controlOp?: { name: string; args?: unknown[] } } } + > ) { if (this.env[SharedBindings.MAYBE_JSON_ENABLE_CONTROL_ENDPOINTS] === true) { const controlOp = req.cf?.miniflare?.controlOp; diff --git a/packages/miniflare/test/plugins/stream/index.spec.ts b/packages/miniflare/test/plugins/stream/index.spec.ts index acc1ab0383..9d956ab2ec 100644 --- a/packages/miniflare/test/plugins/stream/index.spec.ts +++ b/packages/miniflare/test/plugins/stream/index.spec.ts @@ -6,7 +6,6 @@ import { STREAM_OBJECT_CLASS_NAME, STREAM_PLUGIN_NAME, } from "miniflare"; -import type { MiniflareOptions } from "miniflare"; import { describe, test } from "vitest"; import { MiniflareDurableObjectControlStub, @@ -20,6 +19,7 @@ import type { StreamVideo as Video, StreamWatermark as Watermark, } from "@cloudflare/workers-types"; +import type { MiniflareOptions } from "miniflare"; // Mock image / video bytes const TEST_VIDEO_BYTES = new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7]); From e71a122c14744c82981d9fc82a281f191d150e23 Mon Sep 17 00:00:00 2001 From: natewong1313 Date: Wed, 25 Mar 2026 13:07:08 -0500 Subject: [PATCH 10/11] fix: fix downloadedFrom Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- packages/miniflare/src/workers/stream/object.worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/miniflare/src/workers/stream/object.worker.ts b/packages/miniflare/src/workers/stream/object.worker.ts index ff7f4b371e..178c8bd659 100644 --- a/packages/miniflare/src/workers/stream/object.worker.ts +++ b/packages/miniflare/src/workers/stream/object.worker.ts @@ -265,7 +265,7 @@ export class StreamObject extends DurableObject { name: params.name ?? "", size, created: now, - downloaded_from: downloadedFrom ?? "", + downloaded_from: downloadedFrom, opacity: params.opacity ?? 1.0, padding: params.padding ?? 0.05, scale: params.scale ?? 0.15, From b2d629d49a930c890135d34311c7104c4f49b7d3 Mon Sep 17 00:00:00 2001 From: nwong Date: Wed, 25 Mar 2026 13:19:51 -0500 Subject: [PATCH 11/11] fix: fix broken test --- packages/miniflare/test/plugins/stream/index.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/miniflare/test/plugins/stream/index.spec.ts b/packages/miniflare/test/plugins/stream/index.spec.ts index 9d956ab2ec..6c9fbf30d6 100644 --- a/packages/miniflare/test/plugins/stream/index.spec.ts +++ b/packages/miniflare/test/plugins/stream/index.spec.ts @@ -1181,8 +1181,8 @@ describe("Stream watermarks", () => { })) as Watermark; expect(watermark.size).toBe(TEST_IMAGE_BYTES.byteLength); - // downloadedFrom is empty string when created from a stream (not a URL) - expect(watermark.downloadedFrom).toBe(""); + // downloadedFrom is null when created from a stream (not a URL) + expect(watermark.downloadedFrom).toBeNull(); }); });