Skip to content
25 changes: 25 additions & 0 deletions .changeset/stream-binding-local-mode.md
Comment thread
natewong1313 marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"miniflare": minor
"wrangler": minor
---

Add local mode support for Stream bindings

Miniflare and `wrangler dev` now support using [Cloudflare Stream](https://developers.cloudflare.com/stream/) bindings locally.

Supported operations:

- `upload()` — upload video via URL
- `video(id).details()`, `.update()`, `.delete()`, `.generateToken()`
- `videos.list()`
- `captions.generate()`, `.list()`, `.delete()`
- `downloads.generate()`, `.get()`, `.delete()`
- `watermarks.generate()`, `.list()`, `.get()`, `.delete()`

The following are not yet supported in local mode and will throw:

- `createDirectUpload()`
- Caption upload via `File`
- Watermark generation via `File`

Data is persisted across restarts by default. You must set `streamPersist: false` in Miniflare options to disable persistence.
12 changes: 11 additions & 1 deletion packages/miniflare/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import {
SharedOptions,
SOCKET_ENTRY,
SOCKET_ENTRY_LOCAL,
STREAM_PLUGIN_NAME,
WorkerOptions,
WrappedBindingNames,
} from "./plugins";
Expand Down Expand Up @@ -150,6 +151,7 @@ import type {
KVNamespaceListKey,
Queue,
R2Bucket,
StreamBinding,
} from "@cloudflare/workers-types/experimental";
import type { Process } from "@puppeteer/browsers";

Expand Down Expand Up @@ -2167,7 +2169,9 @@ export class Miniflare {
const urlSafeHost = getURLSafeHost(configuredHost);
if (this.#sharedOpts.core.logRequests) {
this.#log.logReady(
`${ready} on ${green(`${secure ? "https" : "http"}://${urlSafeHost}:${entryPort}`)}`
`${ready} on ${green(
`${secure ? "https" : "http"}://${urlSafeHost}:${entryPort}`
)}`
);
}

Expand Down Expand Up @@ -2763,6 +2767,12 @@ export class Miniflare {
}> {
return this.#getProxy(HELLO_WORLD_PLUGIN_NAME, bindingName, workerName);
}
getStreamBinding(
bindingName: string,
workerName?: string
): Promise<StreamBinding> {
return this.#getProxy(STREAM_PLUGIN_NAME, bindingName, workerName);
}

/** @internal */
_getInternalDurableObjectNamespace(
Expand Down
5 changes: 5 additions & 0 deletions packages/miniflare/src/plugins/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { QUEUES_PLUGIN, QUEUES_PLUGIN_NAME } from "./queues";
import { R2_PLUGIN, R2_PLUGIN_NAME } from "./r2";
import { RATELIMIT_PLUGIN, RATELIMIT_PLUGIN_NAME } from "./ratelimit";
import { SECRET_STORE_PLUGIN, SECRET_STORE_PLUGIN_NAME } from "./secret-store";
import { STREAM_PLUGIN, STREAM_PLUGIN_NAME } from "./stream";
import { VECTORIZE_PLUGIN, VECTORIZE_PLUGIN_NAME } from "./vectorize";
import {
VERSION_METADATA_PLUGIN,
Expand Down Expand Up @@ -63,6 +64,7 @@ export const PLUGINS = {
[BROWSER_RENDERING_PLUGIN_NAME]: BROWSER_RENDERING_PLUGIN,
[DISPATCH_NAMESPACE_PLUGIN_NAME]: DISPATCH_NAMESPACE_PLUGIN,
[IMAGES_PLUGIN_NAME]: IMAGES_PLUGIN,
[STREAM_PLUGIN_NAME]: STREAM_PLUGIN,
[VECTORIZE_PLUGIN_NAME]: VECTORIZE_PLUGIN,
[VPC_SERVICES_PLUGIN_NAME]: VPC_SERVICES_PLUGIN,
[MTLS_PLUGIN_NAME]: MTLS_PLUGIN,
Expand Down Expand Up @@ -127,6 +129,7 @@ export type WorkerOptions = z.input<typeof CORE_PLUGIN.options> &
z.input<typeof BROWSER_RENDERING_PLUGIN.options> &
z.input<typeof DISPATCH_NAMESPACE_PLUGIN.options> &
z.input<typeof IMAGES_PLUGIN.options> &
z.input<typeof STREAM_PLUGIN.options> &
z.input<typeof VECTORIZE_PLUGIN.options> &
z.input<typeof VPC_SERVICES_PLUGIN.options> &
z.input<typeof MTLS_PLUGIN.options> &
Expand All @@ -145,6 +148,7 @@ export type SharedOptions = z.input<typeof CORE_PLUGIN.sharedOptions> &
z.input<typeof SECRET_STORE_PLUGIN.sharedOptions> &
z.input<typeof ANALYTICS_ENGINE_PLUGIN.sharedOptions> &
z.input<typeof IMAGES_PLUGIN.sharedOptions> &
z.input<typeof STREAM_PLUGIN.sharedOptions> &
z.input<typeof HELLO_WORLD_PLUGIN.sharedOptions>;

export const PLUGIN_ENTRIES = Object.entries(PLUGINS) as [
Expand Down Expand Up @@ -206,6 +210,7 @@ export * from "./ai";
export * from "./browser-rendering";
export * from "./dispatch-namespace";
export * from "./images";
export * from "./stream";
export * from "./vectorize";
export * from "./vpc-services";
export * from "./mtls";
Expand Down
185 changes: 185 additions & 0 deletions packages/miniflare/src/plugins/stream/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import fs from "node:fs/promises";
import BINDING_SCRIPT from "worker:stream/binding";
import OBJECT_SCRIPT from "worker:stream/object";
import { z } from "zod";
import { SharedBindings } from "../../workers";
import {
getMiniflareObjectBindings,
getPersistPath,
getUserBindingServiceName,
PersistenceSchema,
Plugin,
ProxyNodeBinding,
remoteProxyClientWorker,
RemoteProxyConnectionString,
} from "../shared";
import type { Service } from "../../runtime";

const StreamSchema = z.object({
binding: z.string(),
remoteProxyConnectionString: z
.custom<RemoteProxyConnectionString>()
.optional(),
});

export const StreamOptionsSchema = z.object({
stream: StreamSchema.optional(),
});

export const StreamSharedOptionsSchema = z.object({
streamPersist: PersistenceSchema,
});

export const STREAM_PLUGIN_NAME = "stream";
const STREAM_STORAGE_SERVICE_NAME = `${STREAM_PLUGIN_NAME}:storage`;
const STREAM_OBJECT_SERVICE_NAME = `${STREAM_PLUGIN_NAME}:object`;
export const STREAM_OBJECT_CLASS_NAME = "StreamObject";

export const STREAM_COMPAT_DATE = "2026-03-23";

export const STREAM_PLUGIN: Plugin<
typeof StreamOptionsSchema,
typeof StreamSharedOptionsSchema
> = {
options: StreamOptionsSchema,
sharedOptions: StreamSharedOptionsSchema,
async getBindings(options) {
if (!options.stream) {
return [];
}

return [
{
name: options.stream.binding,
service: {
name: getUserBindingServiceName(
STREAM_PLUGIN_NAME,
options.stream.binding,
options.stream.remoteProxyConnectionString
),
entrypoint: "StreamBinding",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@natewong1313 - I think there is a bug here. When in remote mode the binding proxy doesn't have a named entry point. It just has a default one. So this should be:

Suggested change
entrypoint: "StreamBinding",
entrypoint: options.stream.remoteProxyConnectionString ? undefined : "StreamBinding",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#13476

I was already making the same change for Flagship, so I made the same patch for this as well. Feel free to discard it if it doesn't look right!

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@roerohan Thank you 🙏

},
},
];
},
getNodeBindings(options: z.infer<typeof StreamOptionsSchema>) {
if (!options.stream) {
return {};
}
return {
[options.stream.binding]: new ProxyNodeBinding(),
};
},
async getServices({
options,
sharedOptions,
tmpPath,
defaultPersistRoot,
unsafeStickyBlobs,
}) {
if (!options.stream) {
return [];
}

const serviceName = getUserBindingServiceName(
STREAM_PLUGIN_NAME,
options.stream.binding,
options.stream.remoteProxyConnectionString
);

if (options.stream.remoteProxyConnectionString) {
return [
{
name: serviceName,
worker: remoteProxyClientWorker(
options.stream.remoteProxyConnectionString,
options.stream.binding
),
},
];
}

const persistPath = getPersistPath(
STREAM_PLUGIN_NAME,
tmpPath,
defaultPersistRoot,
sharedOptions.streamPersist
);
await fs.mkdir(persistPath, { recursive: true });

// Disk storage for blobs and SQL
const storageService = {
name: STREAM_STORAGE_SERVICE_NAME,
disk: { path: persistPath, writable: true },
} satisfies Service;

// StreamObject
const objectService = {
name: STREAM_OBJECT_SERVICE_NAME,
worker: {
compatibilityDate: STREAM_COMPAT_DATE,
compatibilityFlags: ["nodejs_compat", "experimental"],
modules: [
{
name: "object.worker.js",
esModule: OBJECT_SCRIPT(),
},
],
durableObjectNamespaces: [
{
className: STREAM_OBJECT_CLASS_NAME,
uniqueKey: `miniflare-${STREAM_OBJECT_CLASS_NAME}`,
enableSql: true,
},
],
durableObjectStorage: { localDisk: STREAM_STORAGE_SERVICE_NAME },
bindings: [
{
name: SharedBindings.MAYBE_SERVICE_BLOBS,
service: { name: STREAM_STORAGE_SERVICE_NAME },
},
...getMiniflareObjectBindings(unsafeStickyBlobs),
],
// Allow the DO to send outbound HTTP requests (fetching watermark images)
globalOutbound: { name: "internet" },
},
} satisfies Service;

// Entrypoint with RPC
const bindingService = {
name: serviceName,
worker: {
compatibilityDate: STREAM_COMPAT_DATE,
compatibilityFlags: ["nodejs_compat", "experimental"],
modules: [
{
name: "binding.worker.js",
esModule: BINDING_SCRIPT(),
},
],
bindings: [
{
name: "store",
durableObjectNamespace: {
className: STREAM_OBJECT_CLASS_NAME,
serviceName: STREAM_OBJECT_SERVICE_NAME,
},
},
],
// Allow the binding worker to send outbound HTTP requests
// (e.g. fetching video from URL in upload fn)
globalOutbound: { name: "internet" },
},
} satisfies Service;

return [storageService, objectService, bindingService];
},
getPersistPath({ streamPersist }, tmpPath) {
return getPersistPath(
STREAM_PLUGIN_NAME,
tmpPath,
undefined,
streamPersist
);
},
};
2 changes: 1 addition & 1 deletion packages/miniflare/src/workers/shared/index.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export {
} from "./router.worker";
export type { RouteHandler } from "./router.worker";

export { get, all, drain } from "./sql.worker";
export { get, all, drain, createTypedSql } from "./sql.worker";
export type {
TypedValue,
TypedResult,
Expand Down
Loading
Loading