Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions doc/api/v8.md
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,31 @@ added: v8.0.0
A subclass of [`Deserializer`][] corresponding to the format written by
[`DefaultSerializer`][].

## `v8.setWasmStreamingHandler(handler)`

* `handler` {object}
* `get` {function}
* `url` {string}
* `set` {function}
* `url` {string}
* `buffer` {ArrayBuffer}
* `delete` {function}
* `url`

```js
import { setWasmStreamingHandler } from 'node:v8';
import * as fs from 'node:fs/promises';

setWasmStreamingHandler({
get: (url) => fs.readFile(`./${url}.cache`),
set: (url, buffer) => fs.writeFile(`./${url}.cache`, buffer),
delete: (url) => fs.unlink(`./${url}.cache`),
});

WebAssembly.instantiateStreaming(fetch('https://example.com/some.wasm'))
.then(() => {});
```

## Promise hooks

The `promiseHooks` interface can be used to track promise lifecycle events.
Expand Down
22 changes: 22 additions & 0 deletions lib/internal/bootstrap/pre_execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,16 @@ function setupFetch() {
throw new ERR_WEBASSEMBLY_RESPONSE('body has already been used');
}

const handler = require('internal/v8').getWasmStreamingHandler();
const cached = await handler?.get(response.url);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand this, this will only work when loading WebAssembly from a remote origin, and only if the URL is stable, and only if the WebAssembly module does not change.

if (cached !== undefined && cache !== null) {
if (streamState.setCompiledModuleBytes(cached)) {
return;
} else {
await handler.delete(response.url);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, V8 has decided not to use the compiled code. That can happen, for example, when running node with certain V8 flags. However, actively deleting something from the cache means that two Node.js processes running with different command-line flags but using the same WebAssembly cache will keep filling and erasing the cache.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tniessen that's a really good point. Do you see a way to isolate this deletion?

}
}

// Pass all data from the response body to the WebAssembly compiler.
for await (const chunk of response.body) {
streamState.push(chunk);
Expand All @@ -257,6 +267,18 @@ function setupFetch() {
// and usable Response or because a network error occurred.
streamState.abort(err);
});
}, (url, buffer) => {
const handler = require('internal/v8').getWasmStreamingHandler();
try {
Promise.resolve(handler?.set(url, buffer))
.catch((e) => {
const { triggerUncaughtException } = internalBinding('errors');
triggerUncaughtException(e, true);
});
} catch (e) {
const { triggerUncaughtException } = internalBinding('errors');
triggerUncaughtException(e, false);
}
});
}

Expand Down
21 changes: 21 additions & 0 deletions lib/internal/v8.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
'use strict';

let streamingHandler;
function setWasmStreamingHandler(handler) {
if (handler !== undefined && handler !== null) {
validateObject(handler, 'handler');
validateFunction(handler.get, 'handler.get');
validateFunction(handler.set, 'handler.set');
validateFunction(handler.delete, 'handler.delete');
}
streamingHandler = handler;
}

function getWasmStreamingHandler() {
return streamingHandler;
}

module.exports = {
setWasmStreamingHandler,
getWasmStreamingHandler,
};
2 changes: 2 additions & 0 deletions lib/v8.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const {
} = internalBinding('heap_utils');
const { HeapSnapshotStream } = require('internal/heap_utils');
const promiseHooks = require('internal/promise_hooks');
const { setWasmStreamingHandler } = require('internal/v8');

/**
* Generates a snapshot of the current V8 heap
Expand Down Expand Up @@ -369,4 +370,5 @@ module.exports = {
serialize,
writeHeapSnapshot,
promiseHooks,
setWasmStreamingHandler,
};
1 change: 1 addition & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ constexpr size_t kFsStatsBufferLength =
V(udp_constructor_function, v8::Function) \
V(url_constructor_function, v8::Function) \
V(wasm_streaming_compilation_impl, v8::Function) \
V(wasm_streaming_compilation_callback, v8::Function) \
V(wasm_streaming_object_constructor, v8::Function)

class Environment;
Expand Down
84 changes: 84 additions & 0 deletions src/node_wasm_web_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ namespace wasm_web_api {

using v8::ArrayBuffer;
using v8::ArrayBufferView;
using v8::BackingStore;
using v8::CompiledWasmModule;
using v8::Context;
using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Local;
using v8::MaybeLocal;
using v8::Object;
using v8::OwnedBuffer;
using v8::String;
using v8::Value;
using v8::WasmStreaming;

Expand All @@ -32,6 +36,7 @@ Local<Function> WasmStreamingObject::Initialize(Environment* env) {
env->SetProtoMethod(t, "push", Push);
env->SetProtoMethod(t, "finish", Finish);
env->SetProtoMethod(t, "abort", Abort);
env->SetProtoMethod(t, "setCompiledModuleBytes", SetCompiledModuleBytes);

auto function = t->GetFunction(env->context()).ToLocalChecked();
env->set_wasm_streaming_object_constructor(function);
Expand All @@ -43,6 +48,7 @@ void WasmStreamingObject::RegisterExternalReferences(
registry->Register(Push);
registry->Register(Finish);
registry->Register(Abort);
registry->Register(SetCompiledModuleBytes);
}

void WasmStreamingObject::MemoryInfo(MemoryTracker* tracker) const {
Expand All @@ -52,6 +58,41 @@ void WasmStreamingObject::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackFieldWithSize("streaming", wasm_size_);
}

class WasmStreamingClient : public WasmStreaming::Client {
public:
explicit WasmStreamingClient(Environment* env) : env_(env) {}

private:
void OnModuleCompiled(CompiledWasmModule compiled_module) {
env_->SetImmediateThreadsafe([compiled_module](Environment* env) mutable {
OwnedBuffer owned = compiled_module.Serialize();
if (owned.size == 0) {
return;
}

std::shared_ptr<BackingStore> store =
ArrayBuffer::NewBackingStore(env->isolate(), owned.size);
unsigned char* dest = static_cast<unsigned char*>(store->Data());
memcpy(dest, &owned.buffer, owned.size);
Local<ArrayBuffer> ab = ArrayBuffer::New(env->isolate(), store);
Local<String> url =
String::NewFromUtf8(env->isolate(),
compiled_module.source_url().c_str(),
{},
compiled_module.source_url().size())
.ToLocalChecked();

Local<Value> args[] = {url, ab};
env->wasm_streaming_compilation_callback()
->Call(
env->context(), Undefined(env->isolate()), arraysize(args), args)
.ToLocalChecked();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if this throws?

});
}

Environment* env_;
};

MaybeLocal<Object> WasmStreamingObject::Create(
Environment* env, std::shared_ptr<WasmStreaming> streaming) {
Local<Function> ctor = Initialize(env);
Expand All @@ -66,6 +107,9 @@ MaybeLocal<Object> WasmStreamingObject::Create(
CHECK_NOT_NULL(ptr);
ptr->streaming_ = streaming;
ptr->wasm_size_ = 0;

ptr->streaming_->SetClient(std::make_shared<WasmStreamingClient>(env));

return obj;
}

Expand Down Expand Up @@ -111,6 +155,45 @@ void WasmStreamingObject::Push(const FunctionCallbackInfo<Value>& args) {
obj->wasm_size_ += size;
}

void WasmStreamingObject::SetCompiledModuleBytes(
const FunctionCallbackInfo<Value>& args) {
WasmStreamingObject* obj;
ASSIGN_OR_RETURN_UNWRAP(&obj, args.Holder());
CHECK(obj->streaming_);

CHECK_EQ(args.Length(), 1);
Local<Value> chunk = args[0];

// The start of the memory section backing the ArrayBuffer(View), the offset
// of the ArrayBuffer(View) within the memory section, and its size in bytes.
const void* bytes;
size_t offset;
size_t size;

if (chunk->IsArrayBufferView()) {
Local<ArrayBufferView> view = chunk.As<ArrayBufferView>();
bytes = view->Buffer()->GetBackingStore()->Data();
offset = view->ByteOffset();
size = view->ByteLength();
} else if (chunk->IsArrayBuffer()) {
Local<ArrayBuffer> buffer = chunk.As<ArrayBuffer>();
bytes = buffer->GetBackingStore()->Data();
offset = 0;
size = buffer->ByteLength();
} else {
return node::THROW_ERR_INVALID_ARG_TYPE(
Environment::GetCurrent(args),
"buffer must be an ArrayBufferView or an ArrayBuffer");
}

bool bytes_used = obj->streaming_->SetCompiledModuleBytes(
static_cast<const uint8_t*>(bytes) + offset, size);
if (bytes_used) {
obj->wasm_size_ += size;
}
args.GetReturnValue().Set(bytes_used);
}

void WasmStreamingObject::Finish(const FunctionCallbackInfo<Value>& args) {
WasmStreamingObject* obj;
ASSIGN_OR_RETURN_UNWRAP(&obj, args.Holder());
Expand Down Expand Up @@ -174,6 +257,7 @@ void StartStreamingCompilation(const FunctionCallbackInfo<Value>& info) {
void SetImplementation(const FunctionCallbackInfo<Value>& info) {
Environment* env = Environment::GetCurrent(info);
env->set_wasm_streaming_compilation_impl(info[0].As<Function>());
env->set_wasm_streaming_compilation_callback(info[1].As<Function>());
}

void Initialize(Local<Object> target,
Expand Down
2 changes: 2 additions & 0 deletions src/node_wasm_web_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class WasmStreamingObject final : public BaseObject {
static void Push(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Finish(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Abort(const v8::FunctionCallbackInfo<v8::Value>& args);
static void SetCompiledModuleBytes(
const v8::FunctionCallbackInfo<v8::Value>& args);

std::shared_ptr<v8::WasmStreaming> streaming_;
size_t wasm_size_;
Expand Down