Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,106 @@ added: v10.5.0
An arbitrary JavaScript value that contains a clone of the data passed
to this thread’s `Worker` constructor.

## worker.locks
<!-- YAML
added: REPLACEME
-->

* {LockManager}

An instance of a [`LockManager`][].

## Class: Lock
<!-- YAML
added: REPLACEME
-->

The Lock interface provides the name and mode of a previously requested lock,
which is received in the callback to [`LockManager.request()`][].

### lock.name
<!-- YAML
added: REPLACEME
-->

* {string}

The name of this lock.

### lock.mode
<!-- YAML
added: REPLACEME
-->

* {string}

The mode of this lock. Either `shared` or `exclusive`.

## Class: LockManager
<!-- YAML
added: REPLACEME
-->

The `LockManager` interface provides methods for requesting a new [`Lock`][]
object and querying for an existing `Lock` object. To get an instance of
`LockManager`, call `worker_threads.locks`.

With the exception of `AbortController` support, this implementation matches
the [browser `LockManager`][] API.

### locks.request(name[, options], callback)
<!-- YAML
added: REPLACEME
-->

* `name` {string}
* `options` {Object}
* `mode` {string} Either `'exclusive'` or `'shared'`. **Default:**
`'exclusive'`.
* `ifAvailable` {boolean} If `true`, the lock request will only be
granted if it is not already held. If it cannot be granted, the
callback will be invoked with `null` instead of a `Lock` instance.
**Default:** `false`.
* `steal` {boolean} If `true`, then any held locks with the same name will be
released, and the request will be granted, preempting any queued requests
for it. **Default:** `false`.
* `callback` {Function} The function to be invoked while the lock is acquired.
The lock will be released when the function ends, or if the function returns
a promise, when that promise settles.
* Returns: {Promise}

Requests a [`Lock`][] object with parameters specifying its name and
characteristics.

```js
worker_threads.locks.request('my_resource', async (lock) => {
// The lock was granted.
}).then(() => {
// The lock is released here.
});
```

### locks.query()
<!-- YAML
added: REPLACEME
-->

* Returns: {Promise}

Returns a Promise that resolves with a [`LockManagerSnapshot`][] which contains
information about held and pending locks.

```js
worker_threads.locks.query().then((state) => {
state.held.forEach((lock) => {
console.log(`held lock: name ${lock.name}, mode ${lock.mode}`);
});
state.pending.forEach((request) => {
console.log(`requested lock: name ${request.name}, mode ${request.mode}`);
});
});
```

## Class: MessageChannel
<!-- YAML
added: v10.5.0
Expand Down Expand Up @@ -483,10 +583,14 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`require('worker_threads').threadId`]: #worker_threads_worker_threadid
[`cluster` module]: cluster.html
[`inspector`]: inspector.html
[`Lock`]: #worker_threads_class_lock
[`LockManager`]: #worker_threads_class_lockmanager
[`LockManagerSnapshot`]: https://developer.mozilla.org/en-US/docs/Web/API/LockManagerSnapshot
[v8.serdes]: v8.html#v8_serialization_api
[`SharedArrayBuffer`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer
[Signals events]: process.html#process_signal_events
[`Uint8Array`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array
[browser `LockManager`]: https://developer.mozilla.org/en-US/docs/Web/API/LockManager
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
[child processes]: child_process.html
[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
Expand Down
20 changes: 19 additions & 1 deletion lib/internal/process/next_tick.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ function setupNextTick(_setupNextTick, _setupPromises) {
// runMicrotasks is used to run V8's micro task queue.
const [
tickInfo,
runMicrotasks
runMicrotasks,
enqueueMicrotask,
] = _setupNextTick(_tickCallback);

// *Must* match Environment::TickInfo::Fields in src/env.h.
Expand Down Expand Up @@ -91,6 +92,23 @@ function setupNextTick(_setupNextTick, _setupPromises) {
}
}

// Internal method to enter the true microtask queue, ensuring that
// the callback isn't called out of order with promise jobs.
function queueMicrotask(callback) {
if (typeof callback !== 'function')
throw new ERR_INVALID_CALLBACK();

enqueueMicrotask(() => {
try {
callback();
} catch (e) {
process.emit('error', e);
}
});
}

exports.queueMicrotask = queueMicrotask;

// `nextTick()` will not enqueue any callback when the process is about to
// exit since the callback would not have a chance to be executed.
function nextTick(callback) {
Expand Down
153 changes: 152 additions & 1 deletion lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ const {
const { internalBinding } = require('internal/bootstrap/loaders');
const { MessagePort, MessageChannel } = internalBinding('messaging');
const { handle_onclose } = internalBinding('symbols');
const locks = internalBinding('locks');
const { clearAsyncIdStack } = require('internal/async_hooks');
const { serializeError, deserializeError } = require('internal/error-serdes');
const DOMException = require('internal/domexception');
const nextTickUtil = require('internal/process/next_tick');

util.inherits(MessagePort, EventEmitter);

Expand Down Expand Up @@ -44,6 +47,7 @@ const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
const kStartedReading = Symbol('kStartedReading');
const kWaitingStreams = Symbol('kWaitingStreams');
const kIncrementsPortRef = Symbol('kIncrementsPortRef');
const kMode = Symbol('mode');

const debug = util.debuglog('worker');

Expand Down Expand Up @@ -505,12 +509,159 @@ function pipeWithoutWarning(source, dest) {
dest._maxListeners = destMaxListeners;
}

// https://wicg.github.io/web-locks/#api-lock
class Lock {
constructor() {
// eslint-disable-next-line no-restricted-syntax
throw new TypeError('Illegal constructor');
}

get name() {
return this[kName];
}

get mode() {
return this[kMode];
}
}

Object.defineProperties(Lock.prototype, {
name: { enumerable: true },
mode: { enumerable: true },
[Symbol.toStringTag]: {
value: 'Lock',
writable: false,
enumerable: false,
configurable: true,
},
});

// https://wicg.github.io/web-locks/#api-lock-manager
class LockManager {
constructor() {
// eslint-disable-next-line no-restricted-syntax
throw new TypeError('Illegal constructor');
}

// https://wicg.github.io/web-locks/#api-lock-manager-request
request(name, options, callback) {
if (callback === undefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, to differentiate this from a callback API can we call this ‘disposer’ and not ‘callback’

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's called "callback" in the spec.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer this to be consistent with Node.js terminology. A callback for Node.js is an error-back. This would be confusing. This would also apply to our docs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. While I generally prefer to keep naming the same or as same as possible to specs, in this case not calling this callback makes sense. Also, on the off chance that someone decided to try to wrap this in util.promisify() because it looks like a callback, it may make sense to have a custom promisify implementation for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

util.promisify makes no sense here, it already returns a promise. i will rename the variable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I know that util.promisify() makes no sense here, but that doesn't mean users won't try. Being defensive by providing a custom promisify implementation that skips wrapping and returns the actual promise makes sense... or, throw within the custom promisify to indicate that it doesn't make sense.

callback = options;
options = undefined;
}

// Let promise be a new promise.
let reject;
let resolve;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});

// If options was not passed, then let options be a new LockOptions
// dictionary with default members.
if (options === undefined) {
options = {
mode: 'exclusive',
ifAvailable: false,
steal: false,
};
}

if (name.startsWith('-')) {
// if name starts with U+002D HYPHEN-MINUS (-), then reject promise with a
// "NotSupportedError" DOMException.
reject(new DOMException('NotSupportedError'));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To guard against cases where we throw by accident can this be an async function and throw instead of reject here?

We can still do the new promise dance below

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, not sure we should reject with a DOMException?

} else if (options.ifAvailable === true && options.steal === true) {
// Otherwise, if both options' steal dictionary member and option's
// ifAvailable dictionary member are true, then reject promise with a
// "NotSupportedError" DOMException.
reject(new DOMException('NotSupportedError'));
} else if (options.steal === true && options.mode !== 'exclusive') {
// Otherwise, if options' steal dictionary member is true and option's
// mode dictionary member is not "exclusive", then reject promise with a
// "NotSupportedError" DOMException.
reject(new DOMException('NotSupportedError'));
} else {
// Otherwise, run these steps:

// Let request be the result of running the steps to request a lock with
// promise, the current agent, environment's id, origin, callback, name,
// options' mode dictionary member, options' ifAvailable dictionary
// member, and option's steal dictionary member.
nextTickUtil.queueMicrotask(() => {
locks.request(
promise,
(name, mode, waitingPromise, release) => {
const lock = Object.create(Lock.prototype, {
[kName]: {
value: name,
writable: false,
enumerable: false,
configurable: false,
},
[kMode]: {
value: mode === 0 ? 'shared' : 'exclusive',
writable: false,
enumerable: false,
configurable: false,
},
});

// When lock lock's waiting promise settles (fulfills or rejects),
// enqueue the following steps on the lock task queue:
waitingPromise
.finally(() => undefined)
.then(() => {
// Release the lock lock.
release();

// Resolve lock's released promise with lock's waiting promise.
resolve(waitingPromise);
});

return callback(lock);
},
name,
options.mode === 'shared' ? 0 : 1,
options.ifAvailable || false,
options.steal || false);
});
}

// Return promise.
return promise;
}

// https://wicg.github.io/web-locks/#api-lock-manager-query
query() {
return new Promise((resolve) => {
nextTickUtil.queueMicrotask(() => {
const snapshot = locks.snapshot();
resolve(snapshot);
});
});
}
}

Object.defineProperties(LockManager.prototype, {
request: { enumerable: true },
query: { enumerable: true },
[Symbol.toStringTag]: {
value: 'LockManager',
writable: false,
enumerable: false,
configurable: true,
},
});

module.exports = {
MessagePort,
MessageChannel,
threadId,
Worker,
setupChild,
isMainThread,
workerStdio
workerStdio,
LockManager,
};
6 changes: 4 additions & 2 deletions lib/worker_threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ const {
MessagePort,
MessageChannel,
threadId,
Worker
Worker,
LockManager,
} = require('internal/worker');

module.exports = {
Expand All @@ -14,5 +15,6 @@ module.exports = {
MessageChannel,
threadId,
Worker,
parentPort: null
parentPort: null,
locks: Object.create(LockManager.prototype),
};
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@
'src/node_file.cc',
'src/node_http2.cc',
'src/node_http_parser.cc',
'src/node_locks.cc',
'src/node_messaging.cc',
'src/node_options.cc',
'src/node_os.cc',
Expand Down Expand Up @@ -413,6 +414,7 @@
'src/node_http2_state.h',
'src/node_internals.h',
'src/node_javascript.h',
'src/node_locks.h',
'src/node_messaging.h',
'src/node_mutex.h',
'src/node_options.h',
Expand Down
12 changes: 12 additions & 0 deletions src/bootstrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ void RunMicrotasks(const FunctionCallbackInfo<Value>& args) {
args.GetIsolate()->RunMicrotasks();
}

void EnqueueMicrotask(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsFunction());
args.GetIsolate()->EnqueueMicrotask(args[0].As<Function>());
}

void SetupTraceCategoryState(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(args[0]->IsFunction());
Expand All @@ -51,9 +56,16 @@ void SetupNextTick(const FunctionCallbackInfo<Value>& args) {
.ToLocalChecked();
run_microtasks_fn->SetName(FIXED_ONE_BYTE_STRING(isolate, "runMicrotasks"));

Local<Function> enqueue_microtasks_fn =
env->NewFunctionTemplate(EnqueueMicrotask)->GetFunction(context)
.ToLocalChecked();
enqueue_microtasks_fn->SetName(
FIXED_ONE_BYTE_STRING(isolate, "EnqueueMicrotask"));

Local<Array> ret = Array::New(isolate, 2);
ret->Set(context, 0, env->tick_info()->fields().GetJSArray()).FromJust();
ret->Set(context, 1, run_microtasks_fn).FromJust();
ret->Set(context, 2, enqueue_microtasks_fn).FromJust();

args.GetReturnValue().Set(ret);
}
Expand Down
Loading