From f9e0c4d5d22bb9a1bc2455c6f862e045381d2b7c Mon Sep 17 00:00:00 2001 From: Gus Caplan Date: Sat, 23 Apr 2022 19:02:11 -0500 Subject: [PATCH 1/2] src,v8: add wasm streaming handler api --- lib/internal/bootstrap/pre_execution.js | 17 +++++ lib/internal/v8.js | 21 +++++++ lib/v8.js | 2 + src/env.h | 1 + src/node_wasm_web_api.cc | 84 +++++++++++++++++++++++++ src/node_wasm_web_api.h | 2 + 6 files changed, 127 insertions(+) create mode 100644 lib/internal/v8.js diff --git a/lib/internal/bootstrap/pre_execution.js b/lib/internal/bootstrap/pre_execution.js index e1b882b6dbe744..3724e9fb9aca20 100644 --- a/lib/internal/bootstrap/pre_execution.js +++ b/lib/internal/bootstrap/pre_execution.js @@ -245,6 +245,16 @@ function setupFetch() { throw new ERR_WEBASSEMBLY_RESPONSE('body has already been used'); } + const handler = require('internal/v8').getWasmStreamingHandler(); + const cached = handler?.get(response.url); + if (cached !== undefined && cache !== null) { + if (streamState.setCompiledModuleBytes(cached)) { + return; + } else { + handler.delete(response.url); + } + } + // Pass all data from the response body to the WebAssembly compiler. for await (const chunk of response.body) { streamState.push(chunk); @@ -257,6 +267,13 @@ function setupFetch() { // and usable Response or because a network error occurred. streamState.abort(err); }); + }, (url, buffer) => { + const handler = require('internal/v8').getWasmStreamingHandler(); + try { + handler?.set(url, buffer); + } catch (e) { + triggerUncaughtException(e, false); + } }); } diff --git a/lib/internal/v8.js b/lib/internal/v8.js new file mode 100644 index 00000000000000..38d9a0c79bfb2a --- /dev/null +++ b/lib/internal/v8.js @@ -0,0 +1,21 @@ +'use strict'; + +let streamingHandler; +function setWasmStreamingHandler(handler) { + if (handler !== undefined && handler !== null) { + validateObject(handler, 'handler'); + validateFunction(handler.get, 'handler.get'); + validateFunction(handler.set, 'handler.set'); + validateFunction(handler.delete, 'handler.delete'); + } + streamingHandler = handler; +} + +function getWasmStreamingHandler() { + return streamingHandler; +} + +module.exports = { + setWasmStreamingHandler, + getWasmStreamingHandler, +}; diff --git a/lib/v8.js b/lib/v8.js index e899da46849d9e..ce90a633325c95 100644 --- a/lib/v8.js +++ b/lib/v8.js @@ -58,6 +58,7 @@ const { } = internalBinding('heap_utils'); const { HeapSnapshotStream } = require('internal/heap_utils'); const promiseHooks = require('internal/promise_hooks'); +const { setWasmStreamingHandler } = require('internal/v8'); /** * Generates a snapshot of the current V8 heap @@ -369,4 +370,5 @@ module.exports = { serialize, writeHeapSnapshot, promiseHooks, + setWasmStreamingHandler, }; diff --git a/src/env.h b/src/env.h index 7e35833e45bd25..b4c7c443a8d204 100644 --- a/src/env.h +++ b/src/env.h @@ -552,6 +552,7 @@ constexpr size_t kFsStatsBufferLength = V(udp_constructor_function, v8::Function) \ V(url_constructor_function, v8::Function) \ V(wasm_streaming_compilation_impl, v8::Function) \ + V(wasm_streaming_compilation_callback, v8::Function) \ V(wasm_streaming_object_constructor, v8::Function) class Environment; diff --git a/src/node_wasm_web_api.cc b/src/node_wasm_web_api.cc index b23096120b1121..9a5adc0db58e96 100644 --- a/src/node_wasm_web_api.cc +++ b/src/node_wasm_web_api.cc @@ -8,6 +8,8 @@ namespace wasm_web_api { using v8::ArrayBuffer; using v8::ArrayBufferView; +using v8::BackingStore; +using v8::CompiledWasmModule; using v8::Context; using v8::Function; using v8::FunctionCallbackInfo; @@ -15,6 +17,8 @@ using v8::FunctionTemplate; using v8::Local; using v8::MaybeLocal; using v8::Object; +using v8::OwnedBuffer; +using v8::String; using v8::Value; using v8::WasmStreaming; @@ -32,6 +36,7 @@ Local WasmStreamingObject::Initialize(Environment* env) { env->SetProtoMethod(t, "push", Push); env->SetProtoMethod(t, "finish", Finish); env->SetProtoMethod(t, "abort", Abort); + env->SetProtoMethod(t, "setCompiledModuleBytes", SetCompiledModuleBytes); auto function = t->GetFunction(env->context()).ToLocalChecked(); env->set_wasm_streaming_object_constructor(function); @@ -43,6 +48,7 @@ void WasmStreamingObject::RegisterExternalReferences( registry->Register(Push); registry->Register(Finish); registry->Register(Abort); + registry->Register(SetCompiledModuleBytes); } void WasmStreamingObject::MemoryInfo(MemoryTracker* tracker) const { @@ -52,6 +58,41 @@ void WasmStreamingObject::MemoryInfo(MemoryTracker* tracker) const { tracker->TrackFieldWithSize("streaming", wasm_size_); } +class WasmStreamingClient : public WasmStreaming::Client { + public: + explicit WasmStreamingClient(Environment* env) : env_(env) {} + + private: + void OnModuleCompiled(CompiledWasmModule compiled_module) { + env_->SetImmediateThreadsafe([compiled_module](Environment* env) mutable { + OwnedBuffer owned = compiled_module.Serialize(); + if (owned.size == 0) { + return; + } + + std::shared_ptr store = + ArrayBuffer::NewBackingStore(env->isolate(), owned.size); + unsigned char* dest = static_cast(store->Data()); + memcpy(dest, &owned.buffer, owned.size); + Local ab = ArrayBuffer::New(env->isolate(), store); + Local url = + String::NewFromUtf8(env->isolate(), + compiled_module.source_url().c_str(), + {}, + compiled_module.source_url().size()) + .ToLocalChecked(); + + Local args[] = {url, ab}; + env->wasm_streaming_compilation_callback() + ->Call( + env->context(), Undefined(env->isolate()), arraysize(args), args) + .ToLocalChecked(); + }); + } + + Environment* env_; +}; + MaybeLocal WasmStreamingObject::Create( Environment* env, std::shared_ptr streaming) { Local ctor = Initialize(env); @@ -66,6 +107,9 @@ MaybeLocal WasmStreamingObject::Create( CHECK_NOT_NULL(ptr); ptr->streaming_ = streaming; ptr->wasm_size_ = 0; + + ptr->streaming_->SetClient(std::make_shared(env)); + return obj; } @@ -111,6 +155,45 @@ void WasmStreamingObject::Push(const FunctionCallbackInfo& args) { obj->wasm_size_ += size; } +void WasmStreamingObject::SetCompiledModuleBytes( + const FunctionCallbackInfo& args) { + WasmStreamingObject* obj; + ASSIGN_OR_RETURN_UNWRAP(&obj, args.Holder()); + CHECK(obj->streaming_); + + CHECK_EQ(args.Length(), 1); + Local chunk = args[0]; + + // The start of the memory section backing the ArrayBuffer(View), the offset + // of the ArrayBuffer(View) within the memory section, and its size in bytes. + const void* bytes; + size_t offset; + size_t size; + + if (LIKELY(chunk->IsArrayBufferView())) { + Local view = chunk.As(); + bytes = view->Buffer()->GetBackingStore()->Data(); + offset = view->ByteOffset(); + size = view->ByteLength(); + } else if (LIKELY(chunk->IsArrayBuffer())) { + Local buffer = chunk.As(); + bytes = buffer->GetBackingStore()->Data(); + offset = 0; + size = buffer->ByteLength(); + } else { + return node::THROW_ERR_INVALID_ARG_TYPE( + Environment::GetCurrent(args), + "chunk must be an ArrayBufferView or an ArrayBuffer"); + } + + bool bytes_used = obj->streaming_->SetCompiledModuleBytes( + static_cast(bytes) + offset, size); + if (bytes_used) { + obj->wasm_size_ += size; + } + args.GetReturnValue().Set(bytes_used); +} + void WasmStreamingObject::Finish(const FunctionCallbackInfo& args) { WasmStreamingObject* obj; ASSIGN_OR_RETURN_UNWRAP(&obj, args.Holder()); @@ -174,6 +257,7 @@ void StartStreamingCompilation(const FunctionCallbackInfo& info) { void SetImplementation(const FunctionCallbackInfo& info) { Environment* env = Environment::GetCurrent(info); env->set_wasm_streaming_compilation_impl(info[0].As()); + env->set_wasm_streaming_compilation_callback(info[1].As()); } void Initialize(Local target, diff --git a/src/node_wasm_web_api.h b/src/node_wasm_web_api.h index 9f5fe868167635..b979087eca7d14 100644 --- a/src/node_wasm_web_api.h +++ b/src/node_wasm_web_api.h @@ -36,6 +36,8 @@ class WasmStreamingObject final : public BaseObject { static void Push(const v8::FunctionCallbackInfo& args); static void Finish(const v8::FunctionCallbackInfo& args); static void Abort(const v8::FunctionCallbackInfo& args); + static void SetCompiledModuleBytes( + const v8::FunctionCallbackInfo& args); std::shared_ptr streaming_; size_t wasm_size_; From 9627235c1325ed03eb170a3f339104e5ae72cef5 Mon Sep 17 00:00:00 2001 From: Gus Caplan Date: Sat, 23 Apr 2022 19:23:18 -0500 Subject: [PATCH 2/2] stuff --- doc/api/v8.md | 25 +++++++++++++++++++++++++ lib/internal/bootstrap/pre_execution.js | 11 ++++++++--- src/node_wasm_web_api.cc | 6 +++--- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/doc/api/v8.md b/doc/api/v8.md index 03c8e0e9571d7b..f0210fc483ac83 100644 --- a/doc/api/v8.md +++ b/doc/api/v8.md @@ -600,6 +600,31 @@ added: v8.0.0 A subclass of [`Deserializer`][] corresponding to the format written by [`DefaultSerializer`][]. +## `v8.setWasmStreamingHandler(handler)` + +* `handler` {object} + * `get` {function} + * `url` {string} + * `set` {function} + * `url` {string} + * `buffer` {ArrayBuffer} + * `delete` {function} + * `url` + +```js +import { setWasmStreamingHandler } from 'node:v8'; +import * as fs from 'node:fs/promises'; + +setWasmStreamingHandler({ + get: (url) => fs.readFile(`./${url}.cache`), + set: (url, buffer) => fs.writeFile(`./${url}.cache`, buffer), + delete: (url) => fs.unlink(`./${url}.cache`), +}); + +WebAssembly.instantiateStreaming(fetch('https://example.com/some.wasm')) + .then(() => {}); +``` + ## Promise hooks The `promiseHooks` interface can be used to track promise lifecycle events. diff --git a/lib/internal/bootstrap/pre_execution.js b/lib/internal/bootstrap/pre_execution.js index 3724e9fb9aca20..08cc5807b443d8 100644 --- a/lib/internal/bootstrap/pre_execution.js +++ b/lib/internal/bootstrap/pre_execution.js @@ -246,12 +246,12 @@ function setupFetch() { } const handler = require('internal/v8').getWasmStreamingHandler(); - const cached = handler?.get(response.url); + const cached = await handler?.get(response.url); if (cached !== undefined && cache !== null) { if (streamState.setCompiledModuleBytes(cached)) { return; } else { - handler.delete(response.url); + await handler.delete(response.url); } } @@ -270,8 +270,13 @@ function setupFetch() { }, (url, buffer) => { const handler = require('internal/v8').getWasmStreamingHandler(); try { - handler?.set(url, buffer); + Promise.resolve(handler?.set(url, buffer)) + .catch((e) => { + const { triggerUncaughtException } = internalBinding('errors'); + triggerUncaughtException(e, true); + }); } catch (e) { + const { triggerUncaughtException } = internalBinding('errors'); triggerUncaughtException(e, false); } }); diff --git a/src/node_wasm_web_api.cc b/src/node_wasm_web_api.cc index 9a5adc0db58e96..ab1a3802a3b461 100644 --- a/src/node_wasm_web_api.cc +++ b/src/node_wasm_web_api.cc @@ -170,12 +170,12 @@ void WasmStreamingObject::SetCompiledModuleBytes( size_t offset; size_t size; - if (LIKELY(chunk->IsArrayBufferView())) { + if (chunk->IsArrayBufferView()) { Local view = chunk.As(); bytes = view->Buffer()->GetBackingStore()->Data(); offset = view->ByteOffset(); size = view->ByteLength(); - } else if (LIKELY(chunk->IsArrayBuffer())) { + } else if (chunk->IsArrayBuffer()) { Local buffer = chunk.As(); bytes = buffer->GetBackingStore()->Data(); offset = 0; @@ -183,7 +183,7 @@ void WasmStreamingObject::SetCompiledModuleBytes( } else { return node::THROW_ERR_INVALID_ARG_TYPE( Environment::GetCurrent(args), - "chunk must be an ArrayBufferView or an ArrayBuffer"); + "buffer must be an ArrayBufferView or an ArrayBuffer"); } bool bytes_used = obj->streaming_->SetCompiledModuleBytes(