Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node }}
- name: Install Rust
run: rustup update stable --no-self-update && rustup default stable
- name: Install wasm32-unknown-unknown target
run: rustup target add wasm32-unknown-unknown
- name: Install wasm32-wasi target
run: rustup target add wasm32-wasi

- run: |
curl https://github.com/WebAssembly/wasi-sdk/releases/download/wasi-sdk-16/wasi-sdk-16.0-linux.tar.gz -L | tar xzvf -
echo "WASI_SDK_PATH=`pwd`/wasi-sdk-16.0" >> $GITHUB_ENV
Expand All @@ -54,9 +55,6 @@ jobs:
echo "WASI_SDK_PATH=`pwd`/wasi-sdk-16.0" >> $GITHUB_ENV
if : matrix.os == 'windows-latest'

- uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node }}
- name: Install NPM packages
run: npm install

Expand Down
16 changes: 12 additions & 4 deletions packages/preview2-shim/lib/io/calls.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ export const OUTPUT_STREAM_GET_TOTAL_BYTES = ++call_id << CALL_SHIFT;
// Io Poll
export const POLL_POLLABLE_READY = ++call_id << CALL_SHIFT;
export const POLL_POLLABLE_BLOCK = ++call_id << CALL_SHIFT;
export const POLL_POLLABLE_DISPOSE = ++call_id << CALL_SHIFT;
export const POLL_POLL_LIST = ++call_id << CALL_SHIFT;

// Futures
export const FUTURE_GET_VALUE_AND_DISPOSE = ++call_id << CALL_SHIFT;
export const FUTURE_DISPOSE = ++call_id << CALL_SHIFT;
export const FUTURE_GET_VALUE_AND_DISPOSE = ++call_id << CALL_SHIFT;
export const FUTURE_SUBSCRIBE = ++call_id << CALL_SHIFT;

// Http
export const HTTP_CREATE_REQUEST = ++call_id << 24;
Expand All @@ -69,15 +71,20 @@ export const CLOCKS_INSTANT_SUBSCRIBE = ++call_id << CALL_SHIFT;
// Sockets
// Tcp
export const SOCKET_TCP_CREATE_HANDLE = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_BIND = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_CONNECT = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_BIND_START = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_BIND_FINISH = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_CONNECT_START = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_CONNECT_FINISH = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_SUBSCRIBE = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_LISTEN = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_LISTEN_START = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_LISTEN_FINISH = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_IS_LISTENING = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_ACCEPT = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_GET_LOCAL_ADDRESS = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_GET_REMOTE_ADDRESS = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_SHUTDOWN = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_SET_KEEP_ALIVE = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_SET_LISTEN_BACKLOG_SIZE = ++call_id << CALL_SHIFT;
export const SOCKET_TCP_DISPOSE = ++call_id << CALL_SHIFT;
// Udp
export const SOCKET_UDP_CREATE_HANDLE = ++call_id << CALL_SHIFT;
Expand All @@ -96,6 +103,7 @@ export const SOCKET_UDP_SET_UNICAST_HOP_LIMIT = ++call_id << CALL_SHIFT;
// Name lookup
export const SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST = ++call_id << CALL_SHIFT;
export const SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST = ++call_id << CALL_SHIFT;
export const SOCKET_RESOLVE_ADDRESS_SUBSCRIBE_REQUEST = ++call_id << CALL_SHIFT;
export const SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST = ++call_id << CALL_SHIFT;

export const reverseMap = {};
Expand Down
35 changes: 19 additions & 16 deletions packages/preview2-shim/lib/io/worker-http.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { createStream, getStreamOrThrow } from "./worker-thread.js";
import {
createReadableStream,
getStreamOrThrow,
} from "./worker-thread.js";
import {
createServer,
request as httpRequest,
Expand Down Expand Up @@ -48,17 +51,19 @@ export async function setOutgoingResponse(
export async function startHttpServer(id, { port, host }) {
const server = createServer((req, res) => {
// create the streams and their ids
const streamId = createStream(req);
const streamId = createReadableStream(req);
const responseId = ++responseCnt;
parentPort.postMessage({
type: HTTP_SERVER_INCOMING_HANDLER,
id,
payload: {
responseId,
method: req.method,
host: req.headers.host || host || 'localhost',
host: req.headers.host || host || "localhost",
pathWithQuery: req.url,
headers: Object.entries(req.headersDistinct).flatMap(([key, val]) => val.map(val => [key, val])),
headers: Object.entries(req.headersDistinct).flatMap(([key, val]) =>
val.map((val) => [key, val])
),
streamId,
},
});
Expand Down Expand Up @@ -109,8 +114,8 @@ export async function createHttpRequest(
req = httpRequest({
agent: httpAgent,
method,
host: authority.split(':')[0],
port: authority.split(':')[1],
host: authority.split(":")[0],
port: authority.split(":")[1],
path: pathWithQuery,
timeout: connectTimeout && Number(connectTimeout),
});
Expand All @@ -119,8 +124,8 @@ export async function createHttpRequest(
req = httpsRequest({
agent: httpsAgent,
method,
host: authority.split(':')[0],
port: authority.split(':')[1],
host: authority.split(":")[0],
port: authority.split(":")[1],
path: pathWithQuery,
timeout: connectTimeout && Number(connectTimeout),
});
Expand All @@ -138,18 +143,16 @@ export async function createHttpRequest(
req.end();
}
const res = await new Promise((resolve, reject) => {
req.on("response", resolve);
req.on("close", () => reject);
req.on("error", reject);
req.once("response", resolve);
req.once("close", () => reject);
req.once("error", reject);
});
if (firstByteTimeout)
res.setTimeout(Number(firstByteTimeout));
if (firstByteTimeout) res.setTimeout(Number(firstByteTimeout));
if (betweenBytesTimeout)
res.on("readable", () => {
res.once("readable", () => {
res.setTimeout(Number(betweenBytesTimeout));
});
res.on("end", () => void res.emit("readable"));
const bodyStreamId = createStream(res);
const bodyStreamId = createReadableStream(res);
return {
status: res.statusCode,
headers: Array.from(Object.entries(res.headers)),
Expand Down
46 changes: 22 additions & 24 deletions packages/preview2-shim/lib/io/worker-io.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { createSyncFn } from "../synckit/index.js";
import {
CALL_MASK,
CALL_TYPE_MASK,
HTTP_SERVER_INCOMING_HANDLER,
INPUT_STREAM_BLOCKING_READ,
INPUT_STREAM_BLOCKING_SKIP,
INPUT_STREAM_DISPOSE,
Expand All @@ -22,13 +23,12 @@ import {
OUTPUT_STREAM_WRITE,
POLL_POLL_LIST,
POLL_POLLABLE_BLOCK,
POLL_POLLABLE_DISPOSE,
POLL_POLLABLE_READY,
HTTP_SERVER_INCOMING_HANDLER,
reverseMap,
} from "./calls.js";
import { STDERR } from "./calls.js";

const DEBUG = false;
import { _rawDebug, exit, stderr, stdout, env } from "node:process";

const workerPath = fileURLToPath(
new URL("./worker-thread.js", import.meta.url)
Expand All @@ -40,11 +40,14 @@ export function registerIncomingHttpHandler(id, handler) {
}

const instanceId = Math.round(Math.random() * 1000).toString();
const DEBUG_DEFAULT = false;
const DEBUG =
env.JCO_DEBUG === "0" ? false : env.JCO_DEBUG === "1" ? true : DEBUG_DEFAULT;

/**
* @type {(call: number, id: number | null, payload: any) -> any}
*/
export let ioCall = createSyncFn(workerPath, (type, id, payload) => {
export let ioCall = createSyncFn(workerPath, DEBUG, (type, id, payload) => {
// 'callbacks' from the worker
// ONLY happens for an http server incoming handler, and NOTHING else (not even sockets, since accept is sync!)
if (type !== HTTP_SERVER_INCOMING_HANDLER)
Expand All @@ -65,7 +68,7 @@ if (DEBUG) {
throw new Error("id must be a number or null");
let ret;
try {
console.error(
_rawDebug(
instanceId,
reverseMap[num & CALL_MASK],
reverseMap[num & CALL_TYPE_MASK],
Expand All @@ -78,7 +81,7 @@ if (DEBUG) {
ret = e;
throw ret;
} finally {
console.error(instanceId, "->", ret);
_rawDebug(instanceId, "->", ret);
}
};
}
Expand Down Expand Up @@ -107,16 +110,13 @@ function streamIoErrorCall(call, id, payload) {
}
// any invalid error is a trap
console.trace(e);
process.exit(1);
exit(1);
}
}

class InputStream {
#id;
#streamType;
get _id() {
return this.#id;
}
read(len) {
return streamIoErrorCall(
INPUT_STREAM_READ | this.#streamType,
Expand Down Expand Up @@ -176,9 +176,6 @@ delete InputStream._id;
class OutputStream {
#id;
#streamType;
get _id() {
return this.#id;
}
checkWrite(len) {
return streamIoErrorCall(
OUTPUT_STREAM_CHECK_WRITE | this.#streamType,
Expand All @@ -196,8 +193,7 @@ class OutputStream {
}
blockingWriteAndFlush(buf) {
if (this.#streamType <= STDERR) {
const stream =
this.#streamType === STDERR ? process.stderr : process.stdout;
const stream = this.#streamType === STDERR ? stderr : stdout;
return void stream.write(buf);
}
return streamIoErrorCall(
Expand Down Expand Up @@ -279,16 +275,20 @@ export const streams = { InputStream, OutputStream };

class Pollable {
#id;
get _id() {
return this.#id;
}
ready() {
if (this.#id === 0) return true;
return ioCall(POLL_POLLABLE_READY, this.#id);
}
block() {
if (this.#id === 0) return;
ioCall(POLL_POLLABLE_BLOCK, this.#id);
if (this.#id !== 0) {
ioCall(POLL_POLLABLE_BLOCK, this.#id);
}
}
[symbolDispose]() {
if (this.#id !== 0) {
ioCall(POLL_POLLABLE_DISPOSE, this.#id);
this.#id = 0;
}
}
static _getId(pollable) {
return pollable.#id;
Expand All @@ -303,6 +303,8 @@ class Pollable {
export const pollableCreate = Pollable._create;
delete Pollable._create;

export const resolvedPoll = pollableCreate(0);

const pollableGetId = Pollable._getId;
delete Pollable._getId;

Expand All @@ -313,10 +315,6 @@ export const poll = {
},
};

export function resolvedPoll() {
return pollableCreate(0);
}

export function createPoll(call, id, initPayload) {
return pollableCreate(ioCall(call, id, initPayload));
}
Loading