Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 138 additions & 182 deletions src/library_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ var LibraryPThread = {
schedPolicy: 0/*SCHED_OTHER*/,
schedPrio: 0
},
// Workers that have been created but uninitialized. These have already been
// parsed, but the wasm file has not yet been loaded, making it
// distinct from the unusedWorkers below. These workers will be created before
// loading wasm on the main thread.
preallocatedWorkers: [],
// Contains all Workers that are idle/unused and not currently hosting an executing pthread.
// Unused Workers can either be pooled up before page startup, but also when a pthread quits, its hosting
// Worker is not terminated, but is returned to this pool as an optimization so that starting the next thread is faster.
Expand All @@ -34,14 +29,26 @@ var LibraryPThread = {
_emscripten_register_main_browser_thread_id(PThread.mainThreadBlock);
},
initMainThreadBlock: function() {
if (ENVIRONMENT_IS_PTHREAD) return undefined;
if (ENVIRONMENT_IS_PTHREAD) return;

#if PTHREAD_POOL_SIZE > 0
var requestedPoolSize = {{{ PTHREAD_POOL_SIZE }}};
#if PTHREADS_DEBUG
out('Preallocating ' + requestedPoolSize + ' workers.');
// Start loading up the Worker pool, if requested.
for(var i = 0; i < {{{PTHREAD_POOL_SIZE}}}; ++i) {
PThread.allocateUnusedWorker();
}
#endif
PThread.preallocatedWorkers = PThread.createNewWorkers(requestedPoolSize);

// In asm.js we do not need to wait for Wasm Module to compile on the main thread, so can load
// up each Worker immediately. (in asm.js mode ignore PTHREAD_POOL_DELAY_LOAD altogether for
// simplicity, as multithreading performance optimizations are not interesting there)
#if !WASM && PTHREAD_POOL_SIZE > 0
addOnPreRun(function() { addRunDependency('pthreads'); });
var numWorkersToLoad = PThread.unusedWorkers.length;
PThread.unusedWorkers.forEach(function(w) { PThread.loadWasmModuleToWorker(w, function() {
// PTHREAD_POOL_DELAY_LOAD==0: we wanted to synchronously wait until the Worker pool
// has loaded up. If all Workers have finished loading up the Wasm Module, proceed with main()
if (!--numWorkersToLoad) removeRunDependency('pthreads');
})});
#endif

PThread.mainThreadBlock = {{{ makeStaticAlloc(C_STRUCTS.pthread.__size__) }}};
Expand Down Expand Up @@ -217,15 +224,6 @@ var LibraryPThread = {
}
PThread.pthreads = {};

for (var i = 0; i < PThread.preallocatedWorkers.length; ++i) {
var worker = PThread.preallocatedWorkers[i];
#if ASSERTIONS
assert(!worker.pthread); // This Worker should not be hosting a pthread at this time.
#endif
worker.terminate();
}
PThread.preallocatedWorkers = [];

for (var i = 0; i < PThread.unusedWorkers.length; ++i) {
var worker = PThread.unusedWorkers[i];
#if ASSERTIONS
Expand Down Expand Up @@ -281,190 +279,148 @@ var LibraryPThread = {
#endif
},

// Allocates the given amount of new web workers and stores them in the pool of unused workers.
// Loads the WebAssembly module into the given list of Workers.
// onFinishedLoading: A callback function that will be called once all of the workers have been initialized and are
// ready to host pthreads. Optional. This is used to mitigate bug https://bugzilla.mozilla.org/show_bug.cgi?id=1049079
allocateUnusedWorkers: function(numWorkers, onFinishedLoading) {
if (typeof SharedArrayBuffer === 'undefined') return; // No multithreading support, no-op.
#if PTHREADS_DEBUG
out('Allocating ' + numWorkers + ' workers for a pthread spawn pool.');
// ready to host pthreads.
loadWasmModuleToWorker: function(worker, onFinishedLoading) {
worker.onmessage = function(e) {
var d = e['data'];
var cmd = d['cmd'];
// Sometimes we need to backproxy events to the calling thread (e.g. HTML5 DOM events handlers such as emscripten_set_mousemove_callback()), so keep track in a globally accessible variable about the thread that initiated the proxying.
if (worker.pthread) PThread.currentProxiedOperationCallerThread = worker.pthread.threadInfoStruct;

// If this message is intended to a recipient that is not the main thread, forward it to the target thread.
if (d['targetThread'] && d['targetThread'] != _pthread_self()) {
var thread = PThread.pthreads[d.targetThread];
if (thread) {
thread.worker.postMessage(e.data, d['transferList']);
} else {
console.error('Internal error! Worker sent a message "' + cmd + '" to target pthread ' + d['targetThread'] + ', but that thread no longer exists!');
}
PThread.currentProxiedOperationCallerThread = undefined;
return;
}

if (cmd === 'processQueuedMainThreadWork') {
// TODO: Must post message to main Emscripten thread in PROXY_TO_WORKER mode.
_emscripten_main_thread_process_queued_calls();
} else if (cmd === 'spawnThread') {
__spawn_thread(e.data);
} else if (cmd === 'cleanupThread') {
__cleanup_thread(d['thread']);
} else if (cmd === 'killThread') {
__kill_thread(d['thread']);
} else if (cmd === 'cancelThread') {
__cancel_thread(d['thread']);
} else if (cmd === 'loaded') {
worker.loaded = true;
if (onFinishedLoading) onFinishedLoading(worker);
// If this Worker is already pending to start running a thread, launch the thread now
if (worker.runPthread) {
worker.runPthread();
delete worker.runPthread;
}
} else if (cmd === 'print') {
out('Thread ' + d['threadId'] + ': ' + d['text']);
} else if (cmd === 'printErr') {
err('Thread ' + d['threadId'] + ': ' + d['text']);
} else if (cmd === 'alert') {
alert('Thread ' + d['threadId'] + ': ' + d['text']);
} else if (cmd === 'exit') {
var detached = worker.pthread && Atomics.load(HEAPU32, (worker.pthread.thread + {{{ C_STRUCTS.pthread.detached }}}) >> 2);
if (detached) {
PThread.returnWorkerToPool(worker);
}
#if EXIT_RUNTIME // If building with -s EXIT_RUNTIME=0, no thread will post this message, so don't even compile it in.
} else if (cmd === 'exitProcess') {
// A pthread has requested to exit the whole application process (runtime).
noExitRuntime = false;
try {
exit(d['returnCode']);
} catch (e) {
if (e instanceof ExitStatus) return;
throw e;
}
#endif
} else if (cmd === 'cancelDone') {
PThread.returnWorkerToPool(worker);
} else if (cmd === 'objectTransfer') {
PThread.receiveObjectTransfer(e.data);
} else if (e.data.target === 'setimmediate') {
worker.postMessage(e.data); // Worker wants to postMessage() to itself to implement setImmediate() emulation.
} else {
err("worker sent an unknown command " + cmd);
}
PThread.currentProxiedOperationCallerThread = undefined;
};

var workers = [];
worker.onerror = function(e) {
err('pthread sent an error! ' + e.filename + ':' + e.lineno + ': ' + e.message);
};

var numWorkersToCreate = numWorkers;
if (PThread.preallocatedWorkers.length > 0) {
var workersUsed = Math.min(PThread.preallocatedWorkers.length, numWorkers);
#if PTHREADS_DEBUG
out('Using ' + workersUsed + ' preallocated workers');
#endif
workers = workers.concat(PThread.preallocatedWorkers.splice(0, workersUsed));
numWorkersToCreate -= workersUsed;
}
if (numWorkersToCreate > 0) {
workers = workers.concat(PThread.createNewWorkers(numWorkersToCreate));
#if ENVIRONMENT_MAY_BE_NODE
if (ENVIRONMENT_HAS_NODE) {
worker.on('message', function(data) {
worker.onmessage({ data: data });
});
worker.on('error', function(data) {
worker.onerror(data);
});
worker.on('exit', function(data) {
console.log('worker exited - TODO: update the worker queue?');
});
}
#endif

// Add the listeners.
PThread.attachListenerToWorkers(workers, onFinishedLoading);

// Load the wasm module into the worker.
for (var i = 0; i < numWorkers; ++i) {
var worker = workers[i];

// Ask the new worker to load up the Emscripten-compiled page. This is a heavy operation.
worker.postMessage({
'cmd': 'load',
// If the application main .js file was loaded from a Blob, then it is not possible
// to access the URL of the current script that could be passed to a Web Worker so that
// it could load up the same file. In that case, developer must either deliver the Blob
// object in Module['mainScriptUrlOrBlob'], or a URL to it, so that pthread Workers can
// independently load up the same main application file.
'urlOrBlob': Module['mainScriptUrlOrBlob'] || _scriptDir,
#if ASSERTIONS && WASM
assert(wasmMemory, 'WebAssembly memory should have been loaded by now!');
assert(wasmModule, 'WebAssembly Module should have been loaded by now!');
#endif

// Ask the new worker to load up the Emscripten-compiled page. This is a heavy operation.
worker.postMessage({
'cmd': 'load',
// If the application main .js file was loaded from a Blob, then it is not possible
// to access the URL of the current script that could be passed to a Web Worker so that
// it could load up the same file. In that case, developer must either deliver the Blob
// object in Module['mainScriptUrlOrBlob'], or a URL to it, so that pthread Workers can
// independently load up the same main application file.
'urlOrBlob': Module['mainScriptUrlOrBlob'] || _scriptDir,
#if WASM
'wasmMemory': wasmMemory,
'wasmModule': wasmModule,
'wasmMemory': wasmMemory,
'wasmModule': wasmModule,
#if LOAD_SOURCE_MAP
'wasmSourceMap': wasmSourceMap,
'wasmSourceMap': wasmSourceMap,
#endif
#if USE_OFFSET_CONVERTER
'wasmOffsetConverter': wasmOffsetConverter,
'wasmOffsetConverter': wasmOffsetConverter,
#endif
#else
'buffer': HEAPU8.buffer,
'asmJsUrlOrBlob': Module["asmJsUrlOrBlob"],
'buffer': HEAPU8.buffer,
'asmJsUrlOrBlob': Module["asmJsUrlOrBlob"],
#endif
'DYNAMIC_BASE': DYNAMIC_BASE,
'DYNAMICTOP_PTR': DYNAMICTOP_PTR
});
PThread.unusedWorkers.push(worker);
}
'DYNAMIC_BASE': DYNAMIC_BASE,
'DYNAMICTOP_PTR': DYNAMICTOP_PTR
});
},

// Attaches the listeners to the given workers. If onFinishedLoading is provided,
// will call that function when all workers have been loaded. It is assumed that no worker
// is yet loaded.
attachListenerToWorkers: function(workers, onFinishedLoading) {
var numWorkersLoaded = 0;
var numWorkers = workers.length;
for (var i = 0; i < numWorkers; ++i) {
var worker = workers[i];
(function(worker) {
worker.onmessage = function(e) {
var d = e['data'];
var cmd = d['cmd'];
// Sometimes we need to backproxy events to the calling thread (e.g. HTML5 DOM events handlers such as emscripten_set_mousemove_callback()), so keep track in a globally accessible variable about the thread that initiated the proxying.
if (worker.pthread) PThread.currentProxiedOperationCallerThread = worker.pthread.threadInfoStruct;

// If this message is intended to a recipient that is not the main thread, forward it to the target thread.
if (d['targetThread'] && d['targetThread'] != _pthread_self()) {
var thread = PThread.pthreads[d.targetThread];
if (thread) {
thread.worker.postMessage(e.data, d['transferList']);
} else {
console.error('Internal error! Worker sent a message "' + cmd + '" to target pthread ' + d['targetThread'] + ', but that thread no longer exists!');
}
PThread.currentProxiedOperationCallerThread = undefined;
return;
}

if (cmd === 'processQueuedMainThreadWork') {
// TODO: Must post message to main Emscripten thread in PROXY_TO_WORKER mode.
_emscripten_main_thread_process_queued_calls();
} else if (cmd === 'spawnThread') {
__spawn_thread(e.data);
} else if (cmd === 'cleanupThread') {
__cleanup_thread(d['thread']);
} else if (cmd === 'killThread') {
__kill_thread(d['thread']);
} else if (cmd === 'cancelThread') {
__cancel_thread(d['thread']);
} else if (cmd === 'loaded') {
worker.loaded = true;
// If this Worker is already pending to start running a thread, launch the thread now
if (worker.runPthread) {
worker.runPthread();
delete worker.runPthread;
}
++numWorkersLoaded;
if (numWorkersLoaded === numWorkers && onFinishedLoading) {
onFinishedLoading();
}
} else if (cmd === 'print') {
out('Thread ' + d['threadId'] + ': ' + d['text']);
} else if (cmd === 'printErr') {
err('Thread ' + d['threadId'] + ': ' + d['text']);
} else if (cmd === 'alert') {
alert('Thread ' + d['threadId'] + ': ' + d['text']);
} else if (cmd === 'exit') {
var detached = worker.pthread && Atomics.load(HEAPU32, (worker.pthread.thread + {{{ C_STRUCTS.pthread.detached }}}) >> 2);
if (detached) {
PThread.returnWorkerToPool(worker);
}
#if EXIT_RUNTIME // If building with -s EXIT_RUNTIME=0, no thread will post this message, so don't even compile it in.
} else if (cmd === 'exitProcess') {
// A pthread has requested to exit the whole application process (runtime).
noExitRuntime = false;
try {
exit(d['returnCode']);
} catch (e) {
if (e instanceof ExitStatus) return;
throw e;
}
#endif
} else if (cmd === 'cancelDone') {
PThread.returnWorkerToPool(worker);
} else if (cmd === 'objectTransfer') {
PThread.receiveObjectTransfer(e.data);
} else if (e.data.target === 'setimmediate') {
worker.postMessage(e.data); // Worker wants to postMessage() to itself to implement setImmediate() emulation.
} else {
err("worker sent an unknown command " + cmd);
}
PThread.currentProxiedOperationCallerThread = undefined;
};

worker.onerror = function(e) {
err('pthread sent an error! ' + e.filename + ':' + e.lineno + ': ' + e.message);
};

#if ENVIRONMENT_MAY_BE_NODE
if (ENVIRONMENT_HAS_NODE) {
worker.on('message', function(data) {
worker.onmessage({ data: data });
});
worker.on('error', function(data) {
worker.onerror(data);
});
worker.on('exit', function(data) {
console.log('worker exited - TODO: update the worker queue?');
});
}
#endif
}(worker));
} // for each worker
},

createNewWorkers: function(numWorkers) {
// Creates new workers with the discovered pthread worker file.
if (typeof SharedArrayBuffer === 'undefined') return []; // No multithreading support, no-op.
#if PTHREADS_DEBUG
out('Creating ' + numWorkers + ' workers.');
#endif
var pthreadMainJs = "{{{ PTHREAD_WORKER_FILE }}}";
// Creates a new web Worker and places it in the unused worker pool to wait for its use.
allocateUnusedWorker: function() {
// Allow HTML module to configure the location where the 'worker.js' file will be loaded from,
// via Module.locateFile() function. If not specified, then the default URL 'worker.js' relative
// to the main html file is loaded.
pthreadMainJs = locateFile(pthreadMainJs);
var newWorkers = [];
for (var i = 0; i < numWorkers; ++i) {
newWorkers.push(new Worker(pthreadMainJs));
}
return newWorkers;
var pthreadMainJs = locateFile('{{{ PTHREAD_WORKER_FILE }}}');
#if PTHREADS_DEBUG
out('Allocating a new web worker from ' + pthreadMainJs);
#endif
PThread.unusedWorkers.push(new Worker(pthreadMainJs));
},

getNewWorker: function() {
if (PThread.unusedWorkers.length == 0) PThread.allocateUnusedWorkers(1);
if (PThread.unusedWorkers.length == 0) {
PThread.allocateUnusedWorker();
PThread.loadWasmModuleToWorker(PThread.unusedWorkers[0]);
}
if (PThread.unusedWorkers.length > 0) return PThread.unusedWorkers.pop();
else return null;
},
Expand Down
Loading