From b99e43778bcfbc13d85a0ae5914f6cbddc7f8a32 Mon Sep 17 00:00:00 2001 From: Guy Bedford Date: Wed, 17 Jan 2024 13:16:55 -0800 Subject: [PATCH] wasi: correct tcp state implementation --- packages/preview2-shim/lib/io/calls.js | 4 +- .../preview2-shim/lib/io/worker-socket-tcp.js | 259 +++++++++--------- .../preview2-shim/lib/io/worker-thread.js | 147 +++++----- packages/preview2-shim/lib/nodejs/http.js | 48 ++-- packages/preview2-shim/lib/nodejs/sockets.js | 37 ++- packages/preview2-shim/test/test.js | 2 + 6 files changed, 260 insertions(+), 237 deletions(-) diff --git a/packages/preview2-shim/lib/io/calls.js b/packages/preview2-shim/lib/io/calls.js index feb7d092d..cf0e82502 100644 --- a/packages/preview2-shim/lib/io/calls.js +++ b/packages/preview2-shim/lib/io/calls.js @@ -50,7 +50,7 @@ export const POLL_POLL_LIST = ++call_id << CALL_SHIFT; // Futures export const FUTURE_DISPOSE = ++call_id << CALL_SHIFT; -export const FUTURE_GET_VALUE_AND_DISPOSE = ++call_id << CALL_SHIFT; +export const FUTURE_TAKE_VALUE = ++call_id << CALL_SHIFT; export const FUTURE_SUBSCRIBE = ++call_id << CALL_SHIFT; // Http @@ -102,7 +102,7 @@ export const SOCKET_UDP_SET_SEND_BUFFER_SIZE = ++call_id << CALL_SHIFT; export const SOCKET_UDP_SET_UNICAST_HOP_LIMIT = ++call_id << CALL_SHIFT; // Name lookup export const SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST = ++call_id << CALL_SHIFT; -export const SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST = ++call_id << CALL_SHIFT; +export const SOCKET_RESOLVE_ADDRESS_TAKE_REQUEST = ++call_id << CALL_SHIFT; export const SOCKET_RESOLVE_ADDRESS_SUBSCRIBE_REQUEST = ++call_id << CALL_SHIFT; export const SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST = ++call_id << CALL_SHIFT; diff --git a/packages/preview2-shim/lib/io/worker-socket-tcp.js b/packages/preview2-shim/lib/io/worker-socket-tcp.js index f61358251..e60403662 100644 --- a/packages/preview2-shim/lib/io/worker-socket-tcp.js +++ b/packages/preview2-shim/lib/io/worker-socket-tcp.js @@ -1,13 +1,14 @@ import { + createFuture, createPoll, createReadableStream, createReadableStreamPollState, createWritableStream, + futureDispose, + futureTakeValue, pollStateReady, - pollStateWait, verifyPollsDroppedForDrop, } from "./worker-thread.js"; -// See: https://github.com/nodejs/node/blob/main/src/tcp_wrap.cc const { TCP, constants: TCPConstants } = process.binding("tcp_wrap"); import { deserializeIpAddress, @@ -33,14 +34,14 @@ const globalBoundAddresses = new Set(); const isWindows = platform() === "win32"; let stateCnt = 0; -const SOCKET_STATE_INIT = ++stateCnt; -const SOCKET_STATE_BIND = ++stateCnt; -const SOCKET_STATE_BOUND = ++stateCnt; -const SOCKET_STATE_LISTEN = ++stateCnt; -const SOCKET_STATE_LISTENER = ++stateCnt; -const SOCKET_STATE_CONNECT = ++stateCnt; -const SOCKET_STATE_CONNECTION = ++stateCnt; -const SOCKET_STATE_ERROR = ++stateCnt; +export const SOCKET_STATE_INIT = ++stateCnt; +export const SOCKET_STATE_BIND = ++stateCnt; +export const SOCKET_STATE_BOUND = ++stateCnt; +export const SOCKET_STATE_LISTEN = ++stateCnt; +export const SOCKET_STATE_LISTENER = ++stateCnt; +export const SOCKET_STATE_CONNECT = ++stateCnt; +export const SOCKET_STATE_CONNECTION = ++stateCnt; +export const SOCKET_STATE_CLOSED = ++stateCnt; /** * @typedef {import("../../types/interfaces/wasi-sockets-network.js").IpSocketAddress} IpSocketAddress @@ -54,7 +55,7 @@ const SOCKET_STATE_ERROR = ++stateCnt; * * @typedef {{ * state: number, - * bindOrConnectAddress: IpSocketAddress | null, + * future: number | null, * serializedLocalAddress: string | null, * listenBacklogSize: number, * handle: TCP, @@ -77,58 +78,73 @@ export function createTcpSocket() { const handle = new TCP(TCPConstants.SOCKET); tcpSockets.set(++tcpSocketCnt, { state: SOCKET_STATE_INIT, - bindOrConnectAddress: null, + future: null, serializedLocalAddress: null, listenBacklogSize: 128, handle, pendingAccepts: [], - pollState: { ready: false, listener: null, polls: [] }, + pollState: { ready: true, listener: null, polls: [], parentStream: null }, }); return tcpSocketCnt; } export function socketTcpSubscribe(id) { - const socket = tcpSockets.get(id); - return createPoll(socket.pollState); + return createPoll(tcpSockets.get(id).pollState); } -export function socketTcpBindStart(id, localAddress) { +export function socketTcpFinish(id, fromState, toState) { const socket = tcpSockets.get(id); - if (socket.state !== SOCKET_STATE_INIT) throw "invalid-state"; - socket.state = SOCKET_STATE_BIND; - socket.bindOrConnectAddress = localAddress; - pollStateWait(socket.pollState); + if (socket.state !== fromState) throw "not-in-progress"; + if (!socket.pollState.ready) throw "would-block"; + const { tag, val } = futureTakeValue(socket.future).val; + futureDispose(socket.future, false); + socket.future = null; + if (tag === "err") { + socket.state = SOCKET_STATE_CLOSED; + throw val; + } else { + socket.state = toState; + // for the listener, we must immediately transition back to unresolved + if (toState === SOCKET_STATE_LISTENER) + socket.pollState.ready = false; + return val; + } } -export function socketTcpBindFinish(id) { +export function socketTcpBindStart(id, localAddress, family) { const socket = tcpSockets.get(id); - if (socket.state !== SOCKET_STATE_BIND) throw "not-in-progress"; + if (socket.state !== SOCKET_STATE_INIT) throw "invalid-state"; + if (family !== localAddress.tag || !isUnicastIpAddress(localAddress)) + throw "invalid-argument"; + if (isIPv4MappedAddress(localAddress)) throw "invalid-argument"; + socket.state = SOCKET_STATE_BIND; const { handle } = socket; - const address = serializeIpAddress(socket.bindOrConnectAddress); - if (isIPv4MappedAddress(socket.bindOrConnectAddress)) - throw 'invalid-argument'; - const port = socket.bindOrConnectAddress.val.port; - if (globalBoundAddresses.has(`${address}:${port}`)) throw "address-in-use"; - const code = - socket.bindOrConnectAddress.tag === "ipv6" - ? handle.bind6(address, port, TCPConstants.UV_TCP_IPV6ONLY) - : handle.bind(address, port); - if (code !== 0) { - socket.state = SOCKET_STATE_ERROR; - throw convertSocketErrorCode(-code); - } - const localAddress = socketTcpGetLocalAddress(id); - const serializedLocalAddress = `${serializeIpAddress(localAddress)}:${ - localAddress.val.port - }`; - globalBoundAddresses.add( - (socket.serializedLocalAddress = serializedLocalAddress) + socket.future = createFuture( + (async () => { + const address = serializeIpAddress(localAddress); + const port = localAddress.val.port; + if (globalBoundAddresses.has(`${address}:${port}`)) + throw "address-in-use"; + const code = + localAddress.tag === "ipv6" + ? handle.bind6(address, port, TCPConstants.UV_TCP_IPV6ONLY) + : handle.bind(address, port); + if (code !== 0) throw convertSocketErrorCode(-code); + { + const localAddress = socketTcpGetLocalAddress(id); + const serializedLocalAddress = `${serializeIpAddress(localAddress)}:${ + localAddress.val.port + }`; + globalBoundAddresses.add( + (socket.serializedLocalAddress = serializedLocalAddress) + ); + } + })(), + socket.pollState ); - socket.state = SOCKET_STATE_BOUND; - pollStateReady(socket.pollState, false); } -export function socketTcpConnectStart(id, { remoteAddress, family }) { +export function socketTcpConnectStart(id, remoteAddress, family) { const socket = tcpSockets.get(id); if (socket.state !== SOCKET_STATE_INIT && socket.state !== SOCKET_STATE_BOUND) throw "invalid-state"; @@ -142,53 +158,80 @@ export function socketTcpConnectStart(id, { remoteAddress, family }) { ) { throw "invalid-argument"; } + if (isIPv4MappedAddress(remoteAddress)) throw "invalid-argument"; socket.state = SOCKET_STATE_CONNECT; - socket.bindOrConnectAddress = remoteAddress; - pollStateWait(socket.pollState); + socket.future = createFuture( + new Promise((resolve, reject) => { + const tcpSocket = new Socket({ + handle: socket.handle, + pauseOnCreate: true, + allowHalfOpen: true, + }); + function handleErr(err) { + tcpSocket.off("connect", handleConnect); + reject(err); + } + function handleConnect() { + tcpSocket.off("error", handleErr); + if (!tcpSocket.serializedLocalAddress) { + const localAddress = socketTcpGetLocalAddress(id); + const serializedLocalAddress = `${serializeIpAddress(localAddress)}:${ + localAddress.val.port + }`; + globalBoundAddresses.add( + (tcpSocket.serializedLocalAddress = serializedLocalAddress) + ); + } + resolve([ + createReadableStream(tcpSocket), + createWritableStream(tcpSocket), + ]); + } + tcpSocket.once("connect", handleConnect); + tcpSocket.once("error", handleErr); + tcpSocket.connect({ + port: remoteAddress.val.port, + host: serializeIpAddress(remoteAddress), + lookup: () => { + throw "invalid-argument"; + }, + }); + }), + socket.pollState + ); } -export function socketTcpConnectFinish(id) { +export function socketTcpListenStart(id) { const socket = tcpSockets.get(id); - if (socket.state !== SOCKET_STATE_CONNECT) throw "not-in-progress"; - const tcpSocket = new Socket({ handle: socket.handle, pauseOnCreate: true, allowHalfOpen: true }); - const remoteAddress = socket.bindOrConnectAddress; - if (isIPv4MappedAddress(remoteAddress)) - throw "invalid-argument"; - return new Promise((resolve, reject) => { - function handleErr(err) { - tcpSocket.off("connect", handleConnect); - socket.state = SOCKET_STATE_ERROR; - pollStateReady(socket.pollState, false); - reject(err); - } - function handleConnect() { - tcpSocket.off("error", handleErr); - if (!tcpSocket.serializedLocalAddress) { - const localAddress = socketTcpGetLocalAddress(id); - const serializedLocalAddress = `${serializeIpAddress(localAddress)}:${ - localAddress.val.port - }`; - globalBoundAddresses.add( - (tcpSocket.serializedLocalAddress = serializedLocalAddress) - ); + if (socket.state !== SOCKET_STATE_BOUND) throw "invalid-state"; + const { handle } = socket; + socket.state = SOCKET_STATE_LISTEN; + socket.future = createFuture( + new Promise((resolve, reject) => { + const server = new Server({ pauseOnConnect: true, allowHalfOpen: true }); + function handleErr(err) { + server.off("listening", handleListen); + reject(err); } - socket.state = SOCKET_STATE_CONNECTION; - pollStateReady(socket.pollState, false); - resolve([ - createReadableStream(tcpSocket), - createWritableStream(tcpSocket), - ]); - } - tcpSocket.once("connect", handleConnect); - tcpSocket.once("error", handleErr); - tcpSocket.connect({ - port: remoteAddress.val.port, - host: serializeIpAddress(remoteAddress), - lookup: () => { - throw "invalid-argument"; - }, - }); - }); + function handleListen() { + server.off("error", handleErr); + server.on("connection", (tcpSocket) => { + pollStateReady(socket.pollState); + const pollState = createReadableStreamPollState(tcpSocket); + socket.pendingAccepts.push({ tcpSocket, err: null, pollState }); + }); + server.on("error", (err) => { + pollStateReady(socket.pollState); + socket.pendingAccepts.push({ tcpSocket: null, err, pollState: null }); + }); + resolve(); + } + server.once("listening", handleListen); + server.once("error", handleErr); + server.listen(handle, socket.listenBacklogSize); + }), + socket.pollState + ); } export function socketTcpAccept(id) { @@ -196,10 +239,15 @@ export function socketTcpAccept(id) { if (socket.state !== SOCKET_STATE_LISTENER) throw "invalid-state"; if (socket.pendingAccepts.length === 0) throw "would-block"; const accept = socket.pendingAccepts.shift(); - if (accept.err) throw convertSocketError(accept.err); + if (accept.err) { + socket.state = SOCKET_STATE_CLOSED; + throw convertSocketError(accept.err); + } + if (socket.pendingAccepts.length === 0) + socket.pollState.ready = false; tcpSockets.set(++tcpSocketCnt, { state: SOCKET_STATE_CONNECTION, - bindOrConnectAddress: null, + future: null, serializedLocalAddress: null, listenBacklogSize: 128, handle: accept.tcpSocket._handle, @@ -213,41 +261,6 @@ export function socketTcpAccept(id) { ]; } -export function socketTcpListenStart(id) { - const socket = tcpSockets.get(id); - if (socket.state !== SOCKET_STATE_BOUND) throw "invalid-state"; - socket.state = SOCKET_STATE_LISTEN; -} - -export function socketTcpListenFinish(id, backlogSize) { - const socket = tcpSockets.get(id); - if (socket.state !== SOCKET_STATE_LISTEN) throw "not-in-progress"; - const { handle } = socket; - const server = new Server({ pauseOnConnect: true, allowHalfOpen: true }); - return new Promise((resolve, reject) => { - function handleErr(err) { - server.off("listening", handleListen); - socket.state = SOCKET_STATE_ERROR; - reject(err); - } - function handleListen() { - server.off("error", handleErr); - server.on("connection", (tcpSocket) => { - const pollState = createReadableStreamPollState(tcpSocket); - socket.pendingAccepts.push({ tcpSocket, err: null, pollState }); - }); - server.on("error", (err) => { - socket.pendingAccepts.push({ tcpSocket: null, err, pollState: null }); - }); - socket.state = SOCKET_STATE_LISTENER; - resolve(); - } - server.once("listening", handleListen); - server.once("error", handleErr); - server.listen(handle, backlogSize); - }); -} - export function socketTcpIsListening(id) { return tcpSockets.get(id).state === SOCKET_STATE_LISTENER; } diff --git a/packages/preview2-shim/lib/io/worker-thread.js b/packages/preview2-shim/lib/io/worker-thread.js index 6fad05832..c27ec2e48 100644 --- a/packages/preview2-shim/lib/io/worker-thread.js +++ b/packages/preview2-shim/lib/io/worker-thread.js @@ -24,7 +24,7 @@ import { FILE, FUTURE_DISPOSE, FUTURE_SUBSCRIBE, - FUTURE_GET_VALUE_AND_DISPOSE, + FUTURE_TAKE_VALUE, HTTP, HTTP_CREATE_REQUEST, HTTP_OUTPUT_STREAM_FINISH, @@ -58,7 +58,7 @@ import { SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST, SOCKET_RESOLVE_ADDRESS_SUBSCRIBE_REQUEST, SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST, - SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST, + SOCKET_RESOLVE_ADDRESS_TAKE_REQUEST, SOCKET_TCP_ACCEPT, SOCKET_TCP_BIND_START, SOCKET_TCP_BIND_FINISH, @@ -97,19 +97,23 @@ import { createTcpSocket, socketTcpAccept, socketTcpBindStart, - socketTcpBindFinish, + socketTcpFinish, socketTcpConnectStart, - socketTcpConnectFinish, socketTcpDispose, socketTcpGetLocalAddress, socketTcpGetRemoteAddress, socketTcpListenStart, - socketTcpListenFinish, socketTcpIsListening, socketTcpSetListenBacklogSize, socketTcpSetKeepAlive, socketTcpShutdown, socketTcpSubscribe, + SOCKET_STATE_BIND, + SOCKET_STATE_BOUND, + SOCKET_STATE_CONNECT, + SOCKET_STATE_CONNECTION, + SOCKET_STATE_LISTEN, + SOCKET_STATE_LISTENER, } from "./worker-socket-tcp.js"; import { SocketUdpReceive, @@ -146,8 +150,10 @@ let pollCnt = 0, * }} Stream * * @typedef {{ - * value: any, - * error: bool, + * future: { + * tag: 'ok' | 'err', + * val: any, + * }, * pollState * }} Future */ @@ -372,12 +378,12 @@ function handle(call, id, payload) { case SOCKET_RESOLVE_ADDRESS_SUBSCRIBE_REQUEST: return createPoll(futures.get(id).pollState); case SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST: - return void futures.delete(id); - case SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST: { - const future = futures.get(id); - if (!future.pollState.ready) throw "would-block"; - futures.delete(id); - return future; + return void futureDispose(id, true); + case SOCKET_RESOLVE_ADDRESS_TAKE_REQUEST: { + const val = futureTakeValue(id); + if (val === undefined) throw "would-block"; + // double take avoidance is ensured + return val.val; } // Sockets TCP @@ -386,17 +392,17 @@ function handle(call, id, payload) { case SOCKET_TCP_CREATE_HANDLE: return createTcpSocket(); case SOCKET_TCP_BIND_START: - return socketTcpBindStart(id, payload); + return socketTcpBindStart(id, payload.localAddress, payload.family); case SOCKET_TCP_BIND_FINISH: - return socketTcpBindFinish(id); + return socketTcpFinish(id, SOCKET_STATE_BIND, SOCKET_STATE_BOUND); case SOCKET_TCP_CONNECT_START: - return socketTcpConnectStart(id, payload); + return socketTcpConnectStart(id, payload.remoteAddress, payload.family); case SOCKET_TCP_CONNECT_FINISH: - return socketTcpConnectFinish(id); + return socketTcpFinish(id, SOCKET_STATE_CONNECT, SOCKET_STATE_CONNECTION); case SOCKET_TCP_LISTEN_START: return socketTcpListenStart(id); case SOCKET_TCP_LISTEN_FINISH: - return socketTcpListenFinish(id); + return socketTcpFinish(id, SOCKET_STATE_LISTEN, SOCKET_STATE_LISTENER); case SOCKET_TCP_IS_LISTENING: return socketTcpIsListening(id); case SOCKET_TCP_SET_LISTEN_BACKLOG_SIZE: @@ -556,8 +562,7 @@ function handle(call, id, payload) { Math.min(stream.stream.readableLength, Number(payload)) ); if (res) return res; - if (stream.stream.readableEnded) - return { tag: "closed" }; + if (stream.stream.readableEnded) return { tag: "closed" }; return new Uint8Array(); } case INPUT_STREAM_BLOCKING_READ: { @@ -595,7 +600,7 @@ function handle(call, id, payload) { case OUTPUT_STREAM_CHECK_WRITE: { const { stream, pollState } = getStreamOrThrow(id); const bytes = stream.writableHighWaterMark - stream.writableLength; - if (bytes === 0) pollStateWait(pollState); + if (bytes === 0) pollState.ready = false; return BigInt(bytes); } case OUTPUT_STREAM_WRITE: { @@ -620,11 +625,11 @@ function handle(call, id, payload) { "wasi-io trap: Cannot write more than permitted writable length" ); } - pollStateWait(stream.pollState); + stream.pollState.ready = false; return (stream.flushPromise = new Promise((resolve, reject) => { stream.stream.write(payload, (err) => { stream.flushPromise = null; - pollStateReady(stream.pollState, false); + pollStateReady(stream.pollState); if (err) return void reject(streamError(err)); resolve(BigInt(payload.byteLength)); }); @@ -633,7 +638,7 @@ function handle(call, id, payload) { case OUTPUT_STREAM_FLUSH: { const stream = getStreamOrThrow(id); if (stream.flushPromise) return; - pollStateWait(stream.pollState); + stream.pollState.ready = false; return (stream.flushPromise = new Promise((resolve, reject) => { stream.stream.write(new Uint8Array([]), (err) => { stream.flushPromise = null; @@ -771,22 +776,17 @@ function handle(call, id, payload) { ); return; - case FUTURE_GET_VALUE_AND_DISPOSE: { - const future = futures.get(id); - if (!future.pollState.ready) throw undefined; - return { value: future.value, error: future.error }; - } + case FUTURE_TAKE_VALUE: + return futureTakeValue(id); + case FUTURE_SUBSCRIBE: { - const future = futures.get(id); + const { pollState } = futures.get(id); const pollId = ++pollCnt; - polls.set(pollId, future.pollState); + polls.set(pollId, pollState); return pollId; } - case FUTURE_DISPOSE: { - const future = futures.get(id); - verifyPollsDroppedForDrop(future.pollState, "future"); - return void futures.delete(id); - } + case FUTURE_DISPOSE: + return void futureDispose(id, true); default: throw new Error( `wasi-io trap: Unknown call ${call} (${reverseMap[call]}) with type ${ @@ -826,22 +826,11 @@ export function verifyPollsDroppedForDrop(pollState, polledResourceDebugName) { const poll = polls.get(pollId); if (poll) throw new Error( - `wasi-io trap: Cannot drop ${polledResourceDebugName} as it has a child poll resource which has not been dropped yet` + `wasi-io trap: Cannot drop ${polledResourceDebugName} as it has a child poll resource which have not been dropped yet` ); } } -/** - * @param {PollState} pollState - */ -export function pollStateWait(pollState) { - pollState.ready = false; - if (pollState.listener) - throw new Error( - "wasi-io trap: poll has a listener and just transitioned back to wait" - ); -} - /** * @param {PollState} pollState * @param {bool} finished @@ -873,9 +862,10 @@ function pollStateCheck(pollState) { if ( pollState.ready && stream.readableLength === 0 && - !stream.readableEnded && !stream.errored + !stream.readableEnded && + !stream.errored ) { - pollStateWait(pollState); + pollState.ready = false; stream.once("readable", () => { pollStateReady(pollState); }); @@ -883,30 +873,63 @@ function pollStateCheck(pollState) { } } -export function createFuture(promise) { +/** + * + * @param {Promise} promise + * @param {PollState | undefined} pollState + * @returns {number} + */ +export function createFuture(promise, pollState) { const futureId = ++futureCnt; - const pollState = { - ready: false, - listener: null, - polls: [], - parent: null, - }; - const future = { error: false, value: null, pollState }; - futures.set(futureId, future); + if (pollState) { + pollState.ready = false; + } else { + pollState = { + ready: false, + listener: null, + polls: [], + parent: null, + }; + } + const future = { tag: "ok", val: null }; + futures.set(futureId, { future, pollState }); promise.then( (value) => { pollStateReady(pollState); - future.value = value; + future.val = value; }, (value) => { pollStateReady(pollState); - future.error = true; - future.value = value; + future.tag = "err"; + future.val = value; } ); return futureId; } +/** + * @param {number} id + * @returns {undefined | { tag: 'ok', val: any } | { tag: 'err', val: undefined }} + * @throws {undefined} + */ +export function futureTakeValue(id) { + const future = futures.get(id); + // Not ready = return undefined + if (!future.pollState.ready) return undefined; + // Ready but already taken = return { tag: 'err', val: undefined } + if (!future.future) return { tag: "err", val: undefined }; + const out = { tag: "ok", val: future.future }; + future.future = null; + return out; +} + +export function futureDispose(id, ownsState) { + const { pollState } = futures.get(id); + if (ownsState) + verifyPollsDroppedForDrop(pollState, "future"); + return void futures.delete(id); +} + let uncaughtException; process.on("uncaughtException", (err) => (uncaughtException = err)); diff --git a/packages/preview2-shim/lib/nodejs/http.js b/packages/preview2-shim/lib/nodejs/http.js index e6fcd2567..9460134ed 100644 --- a/packages/preview2-shim/lib/nodejs/http.js +++ b/packages/preview2-shim/lib/nodejs/http.js @@ -6,7 +6,7 @@ import { HTTP_SERVER_STOP, OUTPUT_STREAM_CREATE, FUTURE_DISPOSE, - FUTURE_GET_VALUE_AND_DISPOSE, + FUTURE_TAKE_VALUE, FUTURE_SUBSCRIBE, HTTP_SERVER_SET_OUTGOING_RESPONSE, HTTP_SERVER_CLEAR_OUTGOING_RESPONSE, @@ -100,15 +100,14 @@ class FutureTrailers { return resolvedPoll; } get() { - if (this.#requested) - return { tag: "err" }; + if (this.#requested) return { tag: "err" }; this.#requested = true; return { tag: "ok", val: { tag: "ok", val: undefined, - } + }, }; } static _create() { @@ -387,37 +386,26 @@ delete IncomingResponse._create; class FutureIncomingResponse { #id; subscribe() { - if (this.#id) return pollableCreate(ioCall(FUTURE_SUBSCRIBE | HTTP, this.#id, null)); - return resolvedPoll; + return pollableCreate(ioCall(FUTURE_SUBSCRIBE | HTTP, this.#id, null)); } get() { - // already taken - if (!this.#id) return { tag: "err" }; - const ret = ioCall(FUTURE_GET_VALUE_AND_DISPOSE | HTTP, this.#id, null); - if (!ret) return; - this.#id = null; - if (ret.error) return { tag: "ok", val: { tag: "err", val: ret.value } }; - const { status, headers, bodyStreamId } = ret.value; - const textEncoder = new TextEncoder(); - return { - tag: "ok", - val: { - tag: "ok", - val: incomingResponseCreate( - status, - fieldsFromEntriesChecked( - headers.map(([key, val]) => [key, textEncoder.encode(val)]) - ), - bodyStreamId + const ret = ioCall(FUTURE_TAKE_VALUE | HTTP, this.#id, null); + if (ret === undefined) return undefined; + if (ret.tag === "ok" && ret.val.tag === "ok") { + const textEncoder = new TextEncoder(); + const { status, headers, bodyStreamId } = ret.val.val; + ret.val.val = incomingResponseCreate( + status, + fieldsFromEntriesChecked( + headers.map(([key, val]) => [key, textEncoder.encode(val)]) ), - }, - }; + bodyStreamId + ); + } + return ret; } [symbolDispose]() { - if (this.#id) { - ioCall(FUTURE_DISPOSE | HTTP, this.#id, null); - this.#id = null; - } + ioCall(FUTURE_DISPOSE | HTTP, this.#id, null); } static _create( method, diff --git a/packages/preview2-shim/lib/nodejs/sockets.js b/packages/preview2-shim/lib/nodejs/sockets.js index 876b93f0c..974f3c96f 100644 --- a/packages/preview2-shim/lib/nodejs/sockets.js +++ b/packages/preview2-shim/lib/nodejs/sockets.js @@ -2,7 +2,7 @@ import { isIP } from "node:net"; import { SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST, SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST, - SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST, + SOCKET_RESOLVE_ADDRESS_TAKE_REQUEST, SOCKET_RESOLVE_ADDRESS_SUBSCRIBE_REQUEST, SOCKET_TCP_ACCEPT, SOCKET_TCP_BIND_FINISH, @@ -47,7 +47,6 @@ import { ipv4ToTuple, ipv6ToTuple, serializeIpAddress, - isUnicastIpAddress, isWildcardAddress, } from "./sockets/socket-common.js"; @@ -119,24 +118,23 @@ class ResolveAddressStream { #error = false; resolveNextAddress() { if (!this.#data) { - ({ value: this.#data, error: this.#error } = ioCall( - SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST, - this.#id, - null - )); + const res = ioCall(SOCKET_RESOLVE_ADDRESS_TAKE_REQUEST, this.#id, null); + this.#data = res.val; + this.#error = res.tag === "err"; } if (this.#error) throw this.#data; if (this.#curItem < this.#data.length) return this.#data[this.#curItem++]; return undefined; } subscribe() { - if (this.#data) return resolvedPoll; - return pollableCreate( - ioCall(SOCKET_RESOLVE_ADDRESS_SUBSCRIBE_REQUEST, this.#id, null) - ); + if (this.#id) + return pollableCreate( + ioCall(SOCKET_RESOLVE_ADDRESS_SUBSCRIBE_REQUEST, this.#id, null) + ); + return resolvedPoll; } [symbolDispose]() { - if (!this.#data) ioCall(SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST, null, null); + if (this.#id) ioCall(SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST, this.#id, null); } static _resolveAddresses(network, name) { if (!mayDnsLookup(network)) throw "permanent-resolver-failure"; @@ -203,7 +201,7 @@ class TcpSocket { // Node.js doesn't give us the ability to detect the OS default, // therefore we hardcode the default value instead of using the OS default, // since we would never be able to report it as a return value otherwise. - // We could make this configurable as a glboal JCO implementation configuration + // We could make this configurable as a global JCO implementation configuration // instead. keepAliveIdleTime: 7200_000_000_000n, @@ -229,9 +227,10 @@ class TcpSocket { } startBind(network, localAddress) { if (!mayTcp(network)) throw "access-denied"; - if (this.#family !== localAddress.tag || !isUnicastIpAddress(localAddress)) - throw "invalid-argument"; - ioCall(SOCKET_TCP_BIND_START, this.#id, localAddress); + ioCall(SOCKET_TCP_BIND_START, this.#id, { + localAddress, + family: this.#family, + }); this.#network = network; } finishBind() { @@ -537,8 +536,7 @@ class UdpSocket { startBind(network, localAddress) { if (!mayUdp(network)) throw "access-denied"; if (this.#state !== SOCKET_STATE_INIT) throw "invalid-state"; - if (this.#family !== localAddress.tag) - throw "invalid-argument"; + if (this.#family !== localAddress.tag) throw "invalid-argument"; this.#bindOrConnectAddress = localAddress; this.#network = network; this.#state = SOCKET_STATE_BIND; @@ -594,8 +592,7 @@ class UdpSocket { } if (isWildcardAddress(remoteAddress)) throw "invalid-argument"; - if (remoteAddress.tag !== this.#family) - throw "invalid-argument"; + if (remoteAddress.tag !== this.#family) throw "invalid-argument"; if (remoteAddress.val.port === 0) throw "invalid-argument"; const host = serializeIpAddress(remoteAddress); diff --git a/packages/preview2-shim/test/test.js b/packages/preview2-shim/test/test.js index f6053e693..cc691b973 100644 --- a/packages/preview2-shim/test/test.js +++ b/packages/preview2-shim/test/test.js @@ -336,6 +336,8 @@ suite("Node.js Preview2", () => { }); ok(!pollable.ready()); + pollable.block(); + ok(pollable.ready()); const [input, output] = tcpSocket.finishConnect();