Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions fixtures/dev-registry/tests/dev-registry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,9 @@ describe("Dev Registry: error handling", () => {
);

// The old-format entry has no debugPortAddress, so the proxy should
// return a 503 (not connected) rather than crashing.
// return a 503 with the incompatible version message rather than crashing.
// We wait specifically for the incompatible message (not the generic "not
// found" message) to ensure the file watcher has picked up the entry.
await vi.waitFor(async () => {
const searchParams = new URLSearchParams({
"test-service": "service-worker",
Expand All @@ -1226,7 +1228,7 @@ describe("Dev Registry: error handling", () => {

expect(response.status).toBe(503);
expect(await response.text()).toEqual(
`Worker "service-worker" not found. Make sure it is running locally.`
`Worker "service-worker" is not compatible with this version of the dev server. Please update all Worker instances to the same version.`
);
}, waitForTimeout);
});
Expand Down
8 changes: 1 addition & 7 deletions packages/miniflare/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2161,7 +2161,7 @@ export class Miniflare {
],
bindings: [
{
name: "DEV_REGISTRY_DEBUG_PORT",
name: CoreBindings.DEV_REGISTRY_DEBUG_PORT,
// workerdDebugPort bindings don't have any additional configuration
workerdDebugPort: kVoid,
},
Expand Down Expand Up @@ -2540,11 +2540,6 @@ export class Miniflare {
this.#runtimeEntryURL !== undefined,
"Runtime entry URL must be set before registering workers"
);
// The loopback address is the workerd entry URL (host:port), used by the
// local explorer for cross-instance HTTP aggregation.
const loopbackAddress = `${this.#runtimeEntryURL.hostname}:${
this.#runtimeEntryURL.port
}`;

const entries: [string, WorkerDefinition][] = [];
for (const workerOpts of this.#workerOpts) {
Expand All @@ -2569,7 +2564,6 @@ export class Miniflare {
debugPortAddress,
defaultEntrypointService,
userWorkerService: getUserServiceName(workerOpts.core.name),
loopbackAddress,
},
]);
}
Expand Down
12 changes: 11 additions & 1 deletion packages/miniflare/src/plugins/core/explorer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import assert from "node:assert";
import SCRIPT_DO_WRAPPER from "worker:core/do-wrapper";
import SCRIPT_LOCAL_EXPLORER from "worker:local-explorer/explorer";
import {
kVoid,
type Service,
type Worker_Binding,
type Worker_Module,
} from "../../runtime";
import { CoreBindings } from "../../workers";
import { normaliseDurableObject } from "../do";
import {
Expand All @@ -14,7 +20,6 @@ import {
SERVICE_LOCAL_EXPLORER,
} from "./constants";
import type { PluginWorkerOptions } from "..";
import type { Service, Worker_Binding, Worker_Module } from "../../runtime";
import type { DurableObjectClassNames, WorkflowOption } from "../shared";
import type {
BindingIdMap,
Expand Down Expand Up @@ -81,6 +86,11 @@ export function getExplorerServices(
name: CoreBindings.JSON_TELEMETRY_CONFIG,
json: JSON.stringify(telemetry),
},
{
name: CoreBindings.DEV_REGISTRY_DEBUG_PORT,
// workerdDebugPort bindings don't have any additional configuration
workerdDebugPort: kVoid,
},
];

if (hasDurableObjects) {
Expand Down
1 change: 0 additions & 1 deletion packages/miniflare/src/shared/DEV_REGISTRY.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ type WorkerDefinition = {
debugPortAddress: string; // e.g. "127.0.0.1:12345"
defaultEntrypointService: string; // workerd service name for default entrypoint
userWorkerService: string; // workerd service name bypassing asset proxies
loopbackAddress: string; // e.g. "127.0.0.1:8787"
};
```

Expand Down
5 changes: 0 additions & 5 deletions packages/miniflare/src/shared/dev-registry-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,4 @@ export type WorkerDefinition = {
* workers it bypasses the Assets proxy (whether built-in or userland)
*/
userWorkerService: string;
/**
* HTTP loopback address for this miniflare instance (e.g. "127.0.0.1:8787").
* Used by the local explorer for cross-instance aggregation.
*/
loopbackAddress: string;
};
1 change: 1 addition & 0 deletions packages/miniflare/src/workers/core/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export const CoreBindings = {
SERVICE_CACHE: "MINIFLARE_CACHE",
SERVICE_DEV_REGISTRY_PROXY: "MINIFLARE_DEV_REGISTRY_PROXY",
JSON_TELEMETRY_CONFIG: "MINIFLARE_TELEMETRY_CONFIG",
DEV_REGISTRY_DEBUG_PORT: "DEV_REGISTRY_DEBUG_PORT",
} as const;

export const ProxyOps = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,29 @@ export function setRegistry(data: Record<string, RegistryEntry>): void {
* Look up a worker's registry entry by service name.
*/
export function resolveTarget(service: string): RegistryEntry | undefined {
return registry.get(service);
const entry = registry.get(service);
if (!entry || !("debugPortAddress" in entry)) {
return undefined;
}
return entry;
}

/**
* Check whether a registry entry exists for the given service, even if it's
* from an incompatible wrangler version.
*/
export function hasRegistryEntry(service: string): boolean {
return registry.has(service);
}

/**
* Return an appropriate error message for a worker that can't be resolved.
*/
export function workerNotFoundMessage(service: string): string {
if (hasRegistryEntry(service)) {
return `Worker "${service}" is not compatible with this version of the dev server. Please update all Worker instances to the same version.`;
Comment thread
penalosa marked this conversation as resolved.
}
return `Worker "${service}" not found. Make sure it is running locally.`;
}

/**
Expand Down Expand Up @@ -130,9 +152,7 @@ export function createProxyDurableObjectClass({
// workerd probes DO properties (fetch, alarm, etc.) via the get
// trap, and throwing here would crash those internal checks.
return () => {
throw new Error(
`Worker "${scriptName}" not found. Make sure it is running locally.`
);
throw new Error(workerNotFoundMessage(scriptName));
};
}
return Reflect.get(fetcher, prop);
Expand All @@ -144,10 +164,7 @@ export function createProxyDurableObjectClass({
const fetcher = this._resolve();
if (!fetcher) {
return Promise.resolve(
new Response(
`Worker "${scriptName}" not found. Make sure it is running locally.`,
{ status: 503 }
)
new Response(workerNotFoundMessage(scriptName), { status: 503 })
);
}
return fetcher.fetch(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
resolveTarget,
tailEventsReplacer,
tailEventsReviver,
workerNotFoundMessage,
} from "./dev-registry-proxy-shared.worker";
import type { WorkerdDebugPortConnector } from "./dev-registry-proxy-shared.worker";

Expand Down Expand Up @@ -75,9 +76,7 @@ export class ExternalServiceProxy extends WorkerEntrypoint<Env, Props> {
}

if (!target._fetcher) {
throw new Error(
`Worker "${ctx.props.service}" not found. Make sure it is running locally.`
);
throw new Error(workerNotFoundMessage(ctx.props.service));
}
return Reflect.get(target._fetcher, prop);
},
Expand All @@ -86,19 +85,16 @@ export class ExternalServiceProxy extends WorkerEntrypoint<Env, Props> {

fetch(request: Request): Promise<Response> | Response {
if (!this._fetcher) {
return new Response(
`Worker "${this.ctx.props.service}" not found. Make sure it is running locally.`,
{ status: 503 }
);
return new Response(workerNotFoundMessage(this.ctx.props.service), {
status: 503,
});
}
return this._fetcher.fetch(request);
}

async scheduled(controller: ScheduledController) {
if (!this._entryFetcher) {
throw new Error(
`Worker "${this.ctx.props.service}" not found. Make sure it is running locally.`
);
throw new Error(workerNotFoundMessage(this.ctx.props.service));
}
const params = new URLSearchParams();
if (controller.cron) {
Expand Down
24 changes: 15 additions & 9 deletions packages/miniflare/src/workers/local-explorer/aggregation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* any one instance can aggregate data from all instances.
*/

import { env } from "cloudflare:workers";
import { CorePaths } from "../core";
import type { WorkerRegistry } from "../../shared/dev-registry-types";
import type { AppContext } from "./common";
Expand All @@ -21,18 +22,19 @@ export const NO_AGGREGATE_HEADER = "X-Miniflare-Explorer-No-Aggregate";
* Get the unique base URLs of peer instances from the dev registry,
* excluding the current instance (identified by worker names).
*/
function getPeerUrls(
function getPeerDebugPortAddresses(
registry: WorkerRegistry,
selfWorkerNames: string[]
): string[] {
const selfSet = new Set(selfWorkerNames);
const urls = Object.entries(registry)
const addresses = Object.entries(registry)
.filter(([name]) => !selfSet.has(name))
.map(([, def]) => `http://${def.loopbackAddress}`);
.map(([, def]) => def.debugPortAddress)
.filter((addr): addr is string => typeof addr === "string");
// A single Miniflare process with multiple workers registers multiple
// entries in the registry, all sharing the same host:port. We deduplicate
// to avoid fetching from the same peer multiple times.
return [...new Set(urls)];
return [...new Set(addresses)];
}

export async function getPeerUrlsIfAggregating(
Expand All @@ -45,25 +47,29 @@ export async function getPeerUrlsIfAggregating(
const workerNames = c.env.LOCAL_EXPLORER_WORKER_NAMES;
const response = await loopback.fetch("http://localhost/core/dev-registry");
const registry = (await response.json()) as WorkerRegistry;
return getPeerUrls(registry, workerNames);
return getPeerDebugPortAddresses(registry, workerNames);
}

/**
* Fetch data from a peer instance's explorer API.
* Returns null on any error (silent omission policy).
*
* @param peerUrl - Base URL of the peer instance (e.g., "http://127.0.0.1:8788")
* @param peerDebugPortAddress - Debug port address of the peer instance (e.g., "127.0.0.1:12345")
* @param apiPath - API path relative to the explorer API base (e.g., "/d1/database")
* @param init - Optional fetch init options
*/
export async function fetchFromPeer(
peerUrl: string,
peerDebugPortAddress: string,
apiPath: string,
init?: RequestInit
): Promise<Response | null> {
try {
const url = new URL(`${EXPLORER_API_PATH}${apiPath}`, peerUrl);
const response = await fetch(url.toString(), {
const client = (env as AppContext["env"]).DEV_REGISTRY_DEBUG_PORT.connect(
peerDebugPortAddress
);
const fetcher = client.getEntrypoint("core:entry");
const url = new URL(`http://localhost${EXPLORER_API_PATH}${apiPath}`);
const response = await fetcher.fetch(url.toString(), {
...init,
headers: {
...(init?.headers as Record<string, string> | undefined),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import type {
} from "../../plugins/core/types";
import type { WorkerRegistry } from "../../shared/dev-registry-types";
import type { CoreBindings } from "../core";
import type { WorkerdDebugPortConnector } from "../core/dev-registry-proxy-shared.worker";
import type { LocalExplorerWorker } from "./generated";

export type Env = {
Expand All @@ -70,6 +71,7 @@ export type Env = {
// Per-worker resource bindings for the /local/workers endpoint
[CoreBindings.JSON_EXPLORER_WORKER_OPTS]: ExplorerWorkerOpts;
[CoreBindings.JSON_TELEMETRY_CONFIG]: { enabled: boolean; deviceId?: string };
[CoreBindings.DEV_REGISTRY_DEBUG_PORT]: WorkerdDebugPortConnector;
};

export type AppBindings = { Bindings: Env };
Expand Down
Loading