diff --git a/ChangeLog.md b/ChangeLog.md index 72372bf25c203..bf9095975ad6b 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -25,6 +25,7 @@ Current Trunk - Add `pthread_tryjoin_np`, which is a POSIX API similar to `pthread_join` but without blocking. - New function emscripten_has_asyncify() + - Add support for pthreads in Node.js, using Node Workers. See #9745 v1.39.1: 10/30/2019 ------------------- diff --git a/src/compiler.js b/src/compiler.js index 253336ec87b81..3babfb7b99e70 100644 --- a/src/compiler.js +++ b/src/compiler.js @@ -193,10 +193,16 @@ Runtime.QUANTUM_SIZE = 4; var ENVIRONMENTS = ENVIRONMENT.split(','); ENVIRONMENT_MAY_BE_WEB = !ENVIRONMENT || ENVIRONMENTS.indexOf('web') >= 0; -ENVIRONMENT_MAY_BE_WORKER = !ENVIRONMENT || ENVIRONMENTS.indexOf('worker') >= 0; ENVIRONMENT_MAY_BE_NODE = !ENVIRONMENT || ENVIRONMENTS.indexOf('node') >= 0; ENVIRONMENT_MAY_BE_SHELL = !ENVIRONMENT || ENVIRONMENTS.indexOf('shell') >= 0; +// The worker case also includes Node.js workers when pthreads are +// enabled and Node.js is one of the supported environments for the build to +// run on. Node.js workers are detected as a combination of +// ENVIRONMENT_IS_WORKER and ENVIRONMENT_HAS_NODE. +ENVIRONMENT_MAY_BE_WORKER = !ENVIRONMENT || ENVIRONMENTS.indexOf('worker') >= 0 || + (ENVIRONMENT_MAY_BE_NODE && USE_PTHREADS); + if (ENVIRONMENT && !(ENVIRONMENT_MAY_BE_WEB || ENVIRONMENT_MAY_BE_WORKER || ENVIRONMENT_MAY_BE_NODE || ENVIRONMENT_MAY_BE_SHELL)) { throw 'Invalid environment specified in "ENVIRONMENT": ' + ENVIRONMENT + '. Should be one of: web, worker, node, shell.'; } diff --git a/src/library_pthread.js b/src/library_pthread.js index 40da916f757c7..522914e47c29b 100644 --- a/src/library_pthread.js +++ b/src/library_pthread.js @@ -432,6 +432,18 @@ var LibraryPThread = { worker.onerror = function(e) { err('pthread sent an error! ' + e.filename + ':' + e.lineno + ': ' + e.message); }; + + if (ENVIRONMENT_HAS_NODE) { + worker.on('message', function(data) { + worker.onmessage({ data: data }); + }); + worker.on('error', function(data) { + worker.onerror(data.err); + }); + worker.on('exit', function(data) { + console.log('worker exited - TODO: update the worker queue?'); + }); + } }(worker)); } // for each worker }, @@ -503,6 +515,7 @@ var LibraryPThread = { if (ENVIRONMENT_IS_PTHREAD) throw 'Internal Error! _spawn_thread() can only ever be called from main application thread!'; var worker = PThread.getNewWorker(); + if (worker.pthread !== undefined) throw 'Internal error!'; if (!threadParams.pthread_ptr) throw 'Internal error, no pthread ptr!'; PThread.runningWorkers.push(worker); @@ -944,6 +957,11 @@ var LibraryPThread = { else PThread.threadExit(status); #if WASM_BACKEND // pthread_exit is marked noReturn, so we must not return from it. + if (ENVIRONMENT_HAS_NODE) { + // exit the pthread properly on node, as a normal JS exception will halt + // the entire application. + process.exit(status); + } throw 'pthread_exit'; #endif }, diff --git a/src/node_shell_read.js b/src/node_shell_read.js new file mode 100644 index 0000000000000..6a1c07872a546 --- /dev/null +++ b/src/node_shell_read.js @@ -0,0 +1,25 @@ + read_ = function shell_read(filename, binary) { + var ret; +#if SUPPORT_BASE64_EMBEDDING + ret = tryParseAsDataURI(filename); + if (!ret) { +#endif + if (!nodeFS) nodeFS = require('fs'); + if (!nodePath) nodePath = require('path'); + filename = nodePath['normalize'](filename); + ret = nodeFS['readFileSync'](filename); +#if SUPPORT_BASE64_EMBEDDING + } +#endif + return binary ? ret : ret.toString(); + }; + + readBinary = function readBinary(filename) { + var ret = read_(filename, true); + if (!ret.buffer) { + ret = new Uint8Array(ret); + } + assert(ret.buffer); + return ret; + }; + diff --git a/src/runtime_init_memory.js b/src/runtime_init_memory.js index 004a4a007c72a..dfbcd92c46d2a 100644 --- a/src/runtime_init_memory.js +++ b/src/runtime_init_memory.js @@ -31,7 +31,13 @@ if (ENVIRONMENT_IS_PTHREAD) { #endif }); #if USE_PTHREADS - assert(wasmMemory.buffer instanceof SharedArrayBuffer, 'requested a shared WebAssembly.Memory but the returned buffer is not a SharedArrayBuffer, indicating that while the browser has SharedArrayBuffer it does not have WebAssembly threads support - you may need to set a flag'); + if (!(wasmMemory.buffer instanceof SharedArrayBuffer)) { + err('requested a shared WebAssembly.Memory but the returned buffer is not a SharedArrayBuffer, indicating that while the browser has SharedArrayBuffer it does not have WebAssembly threads support - you may need to set a flag'); + if (ENVIRONMENT_HAS_NODE) { + console.log('(on node you may need: --experimental-wasm-threads --experimental-wasm-bulk-memory and also use a recent version)'); + } + throw Error('bad memory'); + } #endif } diff --git a/src/shell.js b/src/shell.js index a28b62f49fc23..75a81a683dcc1 100644 --- a/src/shell.js +++ b/src/shell.js @@ -97,6 +97,10 @@ if (Module['ENVIRONMENT']) { var _scriptDir = import.meta.url; #else var _scriptDir = (typeof document !== 'undefined' && document.currentScript) ? document.currentScript.src : undefined; + +if (ENVIRONMENT_IS_NODE) { + _scriptDir = __filename; +} #endif #endif @@ -118,6 +122,9 @@ var read_, setWindowTitle; #if ENVIRONMENT_MAY_BE_NODE +var nodeFS; +var nodePath; + if (ENVIRONMENT_IS_NODE) { #if ENVIRONMENT #if ASSERTIONS @@ -126,35 +133,7 @@ if (ENVIRONMENT_IS_NODE) { #endif scriptDirectory = __dirname + '/'; - // Expose functionality in the same simple way that the shells work - // Note that we pollute the global namespace here, otherwise we break in node - var nodeFS; - var nodePath; - - read_ = function shell_read(filename, binary) { - var ret; -#if SUPPORT_BASE64_EMBEDDING - ret = tryParseAsDataURI(filename); - if (!ret) { -#endif - if (!nodeFS) nodeFS = require('fs'); - if (!nodePath) nodePath = require('path'); - filename = nodePath['normalize'](filename); - ret = nodeFS['readFileSync'](filename); -#if SUPPORT_BASE64_EMBEDDING - } -#endif - return binary ? ret : ret.toString(); - }; - - readBinary = function readBinary(filename) { - var ret = read_(filename, true); - if (!ret.buffer) { - ret = new Uint8Array(ret); - } - assert(ret.buffer); - return ret; - }; +#include "node_shell_read.js" if (process['argv'].length > 1) { thisProgram = process['argv'][1].replace(/\\/g, '/'); @@ -188,6 +167,18 @@ if (ENVIRONMENT_IS_NODE) { }; Module['inspect'] = function () { return '[Emscripten Module object]'; }; + +#if USE_PTHREADS + var nodeWorkerThreads; + try { + nodeWorkerThreads = require('worker_threads'); + } catch (e) { + console.error('The "worker_threads" module is not supported in this node.js build - perhaps a newer version is needed?'); + throw e; + } + Worker = nodeWorkerThreads.Worker; +#endif + } else #endif // ENVIRONMENT_MAY_BE_NODE #if ENVIRONMENT_MAY_BE_SHELL @@ -247,6 +238,10 @@ if (ENVIRONMENT_IS_SHELL) { } } else #endif // ENVIRONMENT_MAY_BE_SHELL + +// Note that this includes Node.js workers when relevant (pthreads is enabled). +// Node.js workers are detected as a combination of ENVIRONMENT_IS_WORKER and +// ENVIRONMENT_HAS_NODE. #if ENVIRONMENT_MAY_BE_WEB || ENVIRONMENT_MAY_BE_WORKER if (ENVIRONMENT_IS_WEB || ENVIRONMENT_IS_WORKER) { if (ENVIRONMENT_IS_WORKER) { // Check worker, not web, since window could be polyfilled @@ -277,68 +272,20 @@ if (ENVIRONMENT_IS_WEB || ENVIRONMENT_IS_WORKER) { #endif #endif - read_ = function shell_read(url) { -#if SUPPORT_BASE64_EMBEDDING - try { -#endif - var xhr = new XMLHttpRequest(); - xhr.open('GET', url, false); - xhr.send(null); - return xhr.responseText; -#if SUPPORT_BASE64_EMBEDDING - } catch (err) { - var data = tryParseAsDataURI(url); - if (data) { - return intArrayToString(data); - } - throw err; - } -#endif - }; + // Differentiate the Web Worker from the Node Worker case, as reading must + // be done differently. +#if USE_PTHREADS + if (ENVIRONMENT_HAS_NODE) { - if (ENVIRONMENT_IS_WORKER) { - readBinary = function readBinary(url) { -#if SUPPORT_BASE64_EMBEDDING - try { -#endif - var xhr = new XMLHttpRequest(); - xhr.open('GET', url, false); - xhr.responseType = 'arraybuffer'; - xhr.send(null); - return new Uint8Array(xhr.response); -#if SUPPORT_BASE64_EMBEDDING - } catch (err) { - var data = tryParseAsDataURI(url); - if (data) { - return data; - } - throw err; - } -#endif - }; - } +#include "node_shell_read.js" - readAsync = function readAsync(url, onload, onerror) { - var xhr = new XMLHttpRequest(); - xhr.open('GET', url, true); - xhr.responseType = 'arraybuffer'; - xhr.onload = function xhr_onload() { - if (xhr.status == 200 || (xhr.status == 0 && xhr.response)) { // file URLs can return 0 - onload(xhr.response); - return; - } -#if SUPPORT_BASE64_EMBEDDING - var data = tryParseAsDataURI(url); - if (data) { - onload(data.buffer); - return; - } + } else #endif - onerror(); - }; - xhr.onerror = onerror; - xhr.send(null); - }; + { + +#include "web_or_worker_shell_read.js" + + } setWindowTitle = function(title) { document.title = title }; } else @@ -349,6 +296,16 @@ if (ENVIRONMENT_IS_WEB || ENVIRONMENT_IS_WORKER) { #endif // ASSERTIONS } +#if ENVIRONMENT_MAY_BE_NODE && USE_PTHREADS +if (ENVIRONMENT_HAS_NODE) { + // Polyfill the performance object, which emscripten pthreads support + // depends on for good timing. + if (typeof performance === 'undefined') { + performance = require('perf_hooks').performance; + } +} +#endif + // Set up the out() and err() hooks, which are how we can print to stdout or // stderr, respectively. {{{ makeModuleReceiveWithVar('out', 'print', 'console.log.bind(console)', true) }}} @@ -393,7 +350,7 @@ assert(typeof Module['setWindowTitle'] === 'undefined', 'Module.setWindowTitle o {{{ makeRemovedFSAssert('NODEFS') }}} #if USE_PTHREADS -assert(ENVIRONMENT_IS_WEB || ENVIRONMENT_IS_WORKER, 'Pthreads do not work in non-browser environments yet (need Web Workers, or an alternative to them)'); +assert(ENVIRONMENT_IS_WEB || ENVIRONMENT_IS_WORKER || ENVIRONMENT_IS_NODE, 'Pthreads do not work in this environment yet (need Web Workers, or an alternative to them)'); #endif // USE_PTHREADS #endif // ASSERTIONS diff --git a/src/web_or_worker_shell_read.js b/src/web_or_worker_shell_read.js new file mode 100644 index 0000000000000..6c7c0560aa260 --- /dev/null +++ b/src/web_or_worker_shell_read.js @@ -0,0 +1,63 @@ + read_ = function shell_read(url) { +#if SUPPORT_BASE64_EMBEDDING + try { +#endif + var xhr = new XMLHttpRequest(); + xhr.open('GET', url, false); + xhr.send(null); + return xhr.responseText; +#if SUPPORT_BASE64_EMBEDDING + } catch (err) { + var data = tryParseAsDataURI(url); + if (data) { + return intArrayToString(data); + } + throw err; + } +#endif + }; + + if (ENVIRONMENT_IS_WORKER) { + readBinary = function readBinary(url) { +#if SUPPORT_BASE64_EMBEDDING + try { +#endif + var xhr = new XMLHttpRequest(); + xhr.open('GET', url, false); + xhr.responseType = 'arraybuffer'; + xhr.send(null); + return new Uint8Array(xhr.response); +#if SUPPORT_BASE64_EMBEDDING + } catch (err) { + var data = tryParseAsDataURI(url); + if (data) { + return data; + } + throw err; + } +#endif + }; + } + + readAsync = function readAsync(url, onload, onerror) { + var xhr = new XMLHttpRequest(); + xhr.open('GET', url, true); + xhr.responseType = 'arraybuffer'; + xhr.onload = function xhr_onload() { + if (xhr.status == 200 || (xhr.status == 0 && xhr.response)) { // file URLs can return 0 + onload(xhr.response); + return; + } +#if SUPPORT_BASE64_EMBEDDING + var data = tryParseAsDataURI(url); + if (data) { + onload(data.buffer); + return; + } +#endif + onerror(); + }; + xhr.onerror = onerror; + xhr.send(null); + }; + diff --git a/src/worker.js b/src/worker.js index 0e373cd87efa2..48f6f0a55437a 100644 --- a/src/worker.js +++ b/src/worker.js @@ -39,16 +39,6 @@ function assert(condition, text) { } #endif -// When error objects propagate from Web Worker to main thread, they lose helpful call stack and thread ID information, so print out errors early here, -// before that happens. -this.addEventListener('error', function(e) { - if (e.message.indexOf('SimulateInfiniteLoop') != -1) return e.preventDefault(); - - var errorSource = ' in ' + e.filename + ':' + e.lineno + ':' + e.colno; - console.error('Pthread ' + selfThreadId + ' uncaught exception' + (e.filename || e.lineno || e.colno ? errorSource : "") + ': ' + e.message + '. Error object:'); - console.error(e.error); -}); - function threadPrintErr() { var text = Array.prototype.slice.call(arguments).join(' '); console.error(text); @@ -274,3 +264,55 @@ this.onmessage = function(e) { throw e; } }; + +#if ENVIRONMENT_MAY_BE_NODE +// Node.js support +if (typeof process === 'object' && typeof process.versions === 'object' && typeof process.versions.node === 'string') { + // Create as web-worker-like an environment as we can. + self = { + location: { + href: __filename + } + }; + + var onmessage = this.onmessage; + + var nodeWorkerThreads = require('worker_threads'); + + Worker = nodeWorkerThreads.Worker; + + var parentPort = nodeWorkerThreads.parentPort; + + parentPort.on('message', function(data) { + onmessage({ data: data }); + }); + + var nodeFS = require('fs'); + + var nodeRead = function(filename) { + return nodeFS.readFileSync(filename).toString(); + }; + + function globalEval(x) { + global.require = require; + global.Module = Module; + eval.call(null, x); + } + + importScripts = function(f) { + globalEval(nodeRead(f)); + }; + + postMessage = function(msg) { + parentPort.postMessage(msg); + }; + + if (typeof performance === 'undefined') { + performance = { + now: function() { + return Date.now(); + } + }; + } +} +#endif // ENVIRONMENT_MAY_BE_NODE diff --git a/tests/core/pthread/create.cpp b/tests/core/pthread/create.cpp new file mode 100644 index 0000000000000..b65de1e560e4e --- /dev/null +++ b/tests/core/pthread/create.cpp @@ -0,0 +1,70 @@ +// Copyright 2019 The Emscripten Authors. All rights reserved. +// Emscripten is available under two separate licenses, the MIT license and the +// University of Illinois/NCSA Open Source License. Both these licenses can be +// found in the LICENSE file. + +#include +#include +#include +#include +#include + +#include + +#define NUM_THREADS 2 +#define TOTAL 100 + +static std::atomic sum; + +void *ThreadMain(void *arg) { + for (int i = 0; i < TOTAL; i++) { + sum++; + // wait for a change, so we see interleaved processing. + int last = sum.load(); + while (sum.load() == last) {} + } + pthread_exit((void*)TOTAL); +} + +pthread_t thread[NUM_THREADS]; + +void CreateThread(int i) +{ + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + static int counter = 1; + int rc = pthread_create(&thread[i], &attr, ThreadMain, (void*)i); + assert(rc == 0); + pthread_attr_destroy(&attr); +} + +void mainn() { + static int main_adds = 0; + int worker_adds = sum.load() - main_adds; + sum++; + main_adds++; + printf("main iter %d : %d\n", main_adds, worker_adds); + if (worker_adds == NUM_THREADS * TOTAL) { + printf("done!\n"); +#ifndef POOL + emscripten_cancel_main_loop(); +#else + exit(0); +#endif + } +} + +int main() { + // Create initial threads. + for(int i = 0; i < NUM_THREADS; ++i) { + CreateThread(i); + } + + // Without a pool, the event loop must be reached for the worker to start up. +#ifndef POOL + emscripten_set_main_loop(mainn, 0, 0); +#else + while (1) mainn(); +#endif +} diff --git a/tests/core/pthread/create.txt b/tests/core/pthread/create.txt new file mode 100644 index 0000000000000..3b17e24adf23c --- /dev/null +++ b/tests/core/pthread/create.txt @@ -0,0 +1 @@ +done! diff --git a/tests/test_core.py b/tests/test_core.py index e9b2ca1821c85..53bfd7261da04 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -150,6 +150,17 @@ def decorated(self): return decorated +def node_pthreads(f): + def decorated(self): + self.set_setting('USE_PTHREADS', 1) + if not self.is_wasm_backend(): + self.skipTest('node pthreads only supported on wasm backend') + if not self.get_setting('WASM'): + self.skipTest("pthreads doesn't work in non-wasm yet") + f(self, js_engines=[NODE_JS + ['--experimental-wasm-threads', '--experimental-wasm-bulk-memory']]) + return decorated + + # A simple check whether the compiler arguments cause optimization. def is_optimizing(args): return '-O' in str(args) and '-O0' not in args @@ -8380,6 +8391,18 @@ def test_fpic_static(self): self.emcc_args.remove('-Werror') self.do_run_in_out_file_test('tests', 'core', 'test_hello_world') + @node_pthreads + def test_pthreads_create(self, js_engines): + def test(): + self.do_run_in_out_file_test('tests', 'core', 'pthread', 'create', + js_engines=js_engines) + test() + + # with a pool, we can synchronously depend on workers being available + self.set_setting('PTHREAD_POOL_SIZE', '2') + self.emcc_args += ['-DPOOL'] + test() + # Generate tests for everything def make_run(name, emcc_args, settings=None, env=None):