diff --git a/doc/api/async_hooks.md b/doc/api/async_hooks.md index 2d981652efdd0d..d3cb39aff977b2 100644 --- a/doc/api/async_hooks.md +++ b/doc/api/async_hooks.md @@ -682,6 +682,106 @@ never be called. * Returns: {number} The same `triggerAsyncId` that is passed to the `AsyncResource` constructor. +## Class: AsyncLocal + + + +This class can be used to set a value which follows asynchronous control flow. +An `AsyncLocal` instance is a key into a continuation local storage. +The value set on an `AsyncLocal` instance is propagated to any async +continuation triggered within this flow. Modification of the value are done via +"copy on write", therefore already created continuations are not effected by +setting a new value, only continuations created afterwards. + +The implementation relys on async hooks to follow the execution flow. Therefore +if some library is not interacting well with async hooks (e.g. it does user +space queuing) it will result in the same problems with `AsyncLocal`. To +correct this such modules should use the `AsyncResource` class. + +### Example + +```js +const http = require('http'); +const wait = require('util').promisify(setTimeout); + +const asyncLocal = new AsyncLocal(); + +function print(...args) { + console.log(`${asyncLocal.value || '-'}:`, ...args); +} + +http.createServer(async (req, res) => { + asyncLocal.value = `${req.method}:${req.url}`; + print('start'); + + setImmediate(async () => { + print('next'); + asyncLocal.value = `${asyncLocal.value}:split`; + await wait(10); + print('branched'); + }); + + await wait(100); + + print('done'); + res.end(); +}).listen(8181); +http.get('http://localhost:8181/first'); +http.get('http://localhost:8181/second'); +// Prints: +// GET:/first: start +// GET:/second: start +// GET:/second: next +// GET:/first: next +// GET:/second:split: branched +// GET:/first:split: branched +// GET:/first: done +// GET:/second: done +``` + +### new AsyncLocal(\[options\]) + +* `options` {Object} + * `onChangedCb` {Function} Optional callback invoked whenever a value of an + `AsyncLocal` changes. + +Creates a new instance of an `AsyncLocal`. Once a value is set it's propagated +to async continuations until it is cleared. + +The optional `onChangedCb` callback signals changes of the value referenced by +the `AsyncLocal` instance. The first argument is the previous value, the +second argument holds the current value and the third argument is a `true` +if this change is caused by a change of the execution context or `false` if a +new value has been assinged to `AsyncLocal.value`. + +The `onChanged` callback may be invoked frequently therefore the processing +inside the callback should be limited. In special it should be avoided to +create new asynchronous operations within the callback as this may in turn +result in followup `onChanged` invocations. + +It's not allowed to set a new `AsyncLocal.value` from the callback. + +If the `onChanged` callback called for execution context changes throws the +error handling is like in [error handling][]. For invokations caused by +setting a new value the exception goes down to the caller of the setter. + +### asyncLocal.value + +Reading this value returns the current value associated with this execution +context (execution async id). + +The value written is stored in a persistent storage and propagated for the +current asychronous execution path. Writting `null` or `undefined` clears the +value and stops further propagation on this execution path. Writing a new +value effects only the current execution context and new async operations +created afterwards. To avoid this copy on write semantic simply store an +`Object` or `Map` and update this. + +Setting a new value directly from `onChanged` callback is not allowed an will +throw an Error. + [`after` callback]: #async_hooks_after_asyncid [`before` callback]: #async_hooks_before_asyncid [`destroy` callback]: #async_hooks_destroy_asyncid @@ -691,3 +791,4 @@ never be called. [PromiseHooks]: https://docs.google.com/document/d/1rda3yKGHimKIhg5YeoAmCOtyURgsbTH_qaYR79FELlk/edit [`Worker`]: worker_threads.html#worker_threads_class_worker [promise execution tracking]: #async_hooks_promise_execution_tracking +[error handling]: #async_hooks_error_handling diff --git a/doc/api/errors.md b/doc/api/errors.md index cf21c142d6dd97..bb4d489b9bbce6 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -637,6 +637,11 @@ A special type of error that can be triggered whenever Node.js detects an exceptional logic violation that should never occur. These are raised typically by the `assert` module. + +### ERR_ASYNCLOCAL_NO_RECURSION + +An attempt was made to set an `AsyncLocal` value from `onChanged` callback. + ### ERR_ASYNC_CALLBACK diff --git a/lib/async_hooks.js b/lib/async_hooks.js index be32f6d1102bbd..8c0d2cdc7172f3 100644 --- a/lib/async_hooks.js +++ b/lib/async_hooks.js @@ -8,7 +8,9 @@ const { const { ERR_ASYNC_CALLBACK, - ERR_INVALID_ASYNC_ID + ERR_ASYNCLOCAL_NO_RECURSION, + ERR_INVALID_ASYNC_ID, + ERR_INVALID_ARG_TYPE } = require('internal/errors').codes; const { validateString } = require('internal/validators'); const internal_async_hooks = require('internal/async_hooks'); @@ -200,6 +202,139 @@ class AsyncResource { } +// AsyncLocal // + +const kStack = Symbol('stack'); +const kIsFirst = Symbol('is-first'); +const kMap = Symbol('map'); +const kOnChangedCb = Symbol('on-changed-cb'); +const kHooks = Symbol('hooks'); +const kSet = Symbol('set'); +const kInOnChangedCb = Symbol('in-on-changed-cb'); +const kInvokeOnChangedCb = Symbol('invoke-on-changed-cb'); + +class AsyncLocal { + constructor(options = {}) { + if (typeof options !== 'object' || options === null) + throw new ERR_INVALID_ARG_TYPE('options', 'Object', options); + + const { onChangedCb = null } = options; + if (onChangedCb !== null && typeof onChangedCb !== 'function') + throw new ERR_INVALID_ARG_TYPE('options.onChangedCb', + 'function', + onChangedCb); + + this[kOnChangedCb] = onChangedCb; + this[kMap] = new Map(); + + const fns = { + init: (asyncId, type, triggerAsyncId, resource) => { + // Propagate value from current id to new (execution graph) + const value = this[kMap].get(executionAsyncId()); + if (value) + this[kMap].set(asyncId, value); + }, + + destroy: (asyncId) => this[kSet](asyncId, null), + }; + + if (this[kOnChangedCb]) { + // Avoid setting a value from onChangedCb + this[kInOnChangedCb] = false; + // Change notification requires to keep a stack of async local values + this[kStack] = []; + // Indicates that first value was stored (before callback "missing") + this[kIsFirst] = true; + + // Use before/after hooks to signal changes because of execution + fns.before = (asyncId) => { + const stack = this[kStack]; + const cVal = this[kMap].get(asyncId); + const pVal = stack[stack.length - 1]; + stack.push(pVal); + if (cVal !== pVal) + this[kInvokeOnChangedCb](pVal, cVal, true); + }; + + fns.after = (asyncId) => { + const stack = this[kStack]; + const pVal = this[kMap].get(asyncId); + stack.pop(); + const cVal = stack[stack.length - 1]; + if (cVal !== pVal) + this[kInvokeOnChangedCb](pVal, cVal, true); + }; + } + this[kHooks] = createHook(fns); + } + + set value(val) { + if (this[kInOnChangedCb]) + throw new ERR_ASYNCLOCAL_NO_RECURSION(); + + val = val === null ? undefined : val; + const id = executionAsyncId(); + const onChangedCb = this[kOnChangedCb]; + let pVal; + if (onChangedCb) + pVal = this[kMap].get(id); + + this[kSet](id, val); + + if (onChangedCb && pVal !== val) + this[kInvokeOnChangedCb](pVal, val, false); + } + + get value() { + return this[kMap].get(executionAsyncId()); + } + + [kSet](id, val) { + const map = this[kMap]; + + if (val == null) { + map.delete(id); + if (map.size === 0) + this[kHooks].disable(); + + if (this[kOnChangedCb]) { + const stack = this[kStack]; + if (map.size === 0) { + // Hooks have been disabled so next set is the first one + stack.length = 0; + this[kIsFirst] = true; + } else { + stack[stack.length - 1] = undefined; + } + } + } else { + map.set(id, val); + if (map.size === 1) + this[kHooks].enable(); + + if (this[kOnChangedCb]) { + const stack = this[kStack]; + if (this[kIsFirst]) { + // First value set => "simulate" before hook + this[kIsFirst] = false; + stack.push(val); + } else { + stack[stack.length - 1] = val; + } + } + } + } + + [kInvokeOnChangedCb](p, c, e) { + try { + this[kInOnChangedCb] = true; + this[kOnChangedCb](p, c, e); + } finally { + this[kInOnChangedCb] = false; + } + } +} + // Placing all exports down here because the exported classes won't export // otherwise. module.exports = { @@ -209,4 +344,6 @@ module.exports = { triggerAsyncId, // Embedder API AsyncResource, + // CLS API + AsyncLocal }; diff --git a/lib/internal/errors.js b/lib/internal/errors.js index ad12d99c7cc49c..27ec2630b3fc85 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -721,6 +721,8 @@ module.exports = { E('ERR_AMBIGUOUS_ARGUMENT', 'The "%s" argument is ambiguous. %s', TypeError); E('ERR_ARG_NOT_ITERABLE', '%s must be iterable', TypeError); E('ERR_ASSERTION', '%s', Error); +E('ERR_ASYNCLOCAL_NO_RECURSION', + 'Setting value from onChanged callback is not allowed', Error); E('ERR_ASYNC_CALLBACK', '%s must be a function', TypeError); E('ERR_ASYNC_TYPE', 'Invalid name for async "type": %s', TypeError); E('ERR_BROTLI_INVALID_PARAM', '%s is not a valid Brotli parameter', RangeError); diff --git a/test/async-hooks/test-async-local.js b/test/async-hooks/test-async-local.js new file mode 100644 index 00000000000000..97eda0161f06c8 --- /dev/null +++ b/test/async-hooks/test-async-local.js @@ -0,0 +1,257 @@ +'use strict'; +const common = require('../common'); + +// This test verifys the AsyncLocal functionality. + +const assert = require('assert'); +const { AsyncLocal, AsyncResource } = require('async_hooks'); + +{ + common.expectsError( + () => new AsyncLocal(15), + { + code: 'ERR_INVALID_ARG_TYPE', + type: TypeError, + message: 'The "options" argument must be of type Object. ' + + 'Received type number' + } + ); + + common.expectsError( + () => new AsyncLocal({ onChangedCb: {} }), + { + code: 'ERR_INVALID_ARG_TYPE', + type: TypeError, + message: 'The "options.onChangedCb" property must be of type ' + + 'function. Received type object' + } + ); +} + +{ + const asyncLocal1 = new AsyncLocal(); + const asyncLocal2 = new AsyncLocal(); + const asyncLocal3 = new AsyncLocal(); + + assert.strictEqual(asyncLocal1.value, undefined); + asyncLocal1.value = 'one'; + asyncLocal2.value = 'two'; + asyncLocal3.value = 'three'; + + setImmediate(common.mustCall(() => { + assert.strictEqual(asyncLocal1.value, 'one'); + assert.strictEqual(asyncLocal2.value, 'two'); + assert.strictEqual(asyncLocal3.value, 'three'); + setImmediate(common.mustCall(() => { + assert.strictEqual(asyncLocal1.value, 'one'); + assert.strictEqual(asyncLocal2.value, 'two'); + assert.strictEqual(asyncLocal3.value, 'three'); + })); + asyncLocal1.value = null; + asyncLocal3.value = 'four'; + assert.strictEqual(asyncLocal1.value, undefined); + assert.strictEqual(asyncLocal2.value, 'two'); + assert.strictEqual(asyncLocal3.value, 'four'); + setImmediate(common.mustCall(() => { + assert.strictEqual(asyncLocal1.value, undefined); + assert.strictEqual(asyncLocal2.value, 'two'); + assert.strictEqual(asyncLocal3.value, 'four'); + })); + })); +} + +{ + async function asyncFunc() {} + + const asyncLocal = new AsyncLocal(); + + async function testAwait() { + asyncLocal.value = 42; + await asyncFunc(); + assert.strictEqual(asyncLocal.value, 42); + } + testAwait().then(common.mustCall(() => + assert.strictEqual(asyncLocal.value, 42) + )); +} + +{ + async function asyncFunc() {} + + let asyncLocal; + + async function testAwait() { + asyncLocal = new AsyncLocal(); + + asyncLocal.value = 42; + await asyncFunc(); + assert.strictEqual(asyncLocal.value, 42); + } + testAwait().then(common.mustCall(() => + assert.strictEqual(asyncLocal.value, 42) + )); +} + +{ + const asyncLocal = new AsyncLocal(); + const mutableObj = { a: 'b' }; + + asyncLocal.value = mutableObj; + process.nextTick(common.mustCall(() => { + assert.deepStrictEqual(mutableObj, { a: 'b', b: 'a' }); + })); + mutableObj.b = 'a'; +} + +{ + const exp = [ + [ undefined, 'foo', false ], + [ 'foo', undefined, false ], + [ undefined, 'bar', false ] + ]; + + const act = []; + const asyncLocal = new AsyncLocal({ + onChangedCb: (p, c, t) => act.push([p, c, t]) + }); + + asyncLocal.value = 'foo'; + assert.strictEqual(act.length, 1); + + asyncLocal.value = null; + assert.strictEqual(act.length, 2); + + asyncLocal.value = 'bar'; + assert.strictEqual(act.length, 3); + + assert.deepStrictEqual(act, exp); +} + +{ + const asyncLocal = new AsyncLocal(); + const asyncRes1 = new AsyncResource('Resource1'); + asyncLocal.value = 'R'; + const asyncRes2 = new AsyncResource('Resource2'); + + asyncRes1.runInAsyncScope(common.mustCall(() => { + assert.strictEqual(asyncLocal.value, undefined); + asyncRes2.runInAsyncScope(common.mustCall(() => + assert.strictEqual(asyncLocal.value, 'R') + )); + assert.strictEqual(asyncLocal.value, undefined); + })); + assert.strictEqual(asyncLocal.value, 'R'); +} + +{ + const exp = [ + [ undefined, 'foo', false ], + [ 'foo', 'bar', false ], + [ 'bar', 'foo', true ], + [ 'foo', 'bar', true ], + [ 'bar', 'foo', true ], + [ 'foo', undefined, true ], + [ undefined, 'foo', true ], + [ 'foo', undefined, true ], + [ undefined, 'bar', true ], + ]; + + const act = []; + const asyncLocal = new AsyncLocal({ + onChangedCb: (p, c, t) => act.push([p, c, t]) + }); + + process.nextTick(common.mustCall(() => { + asyncLocal.value = 'foo'; + assert.strictEqual(act.length, 1); + + const r1 = new AsyncResource('R1'); + const r2 = new AsyncResource('R2'); + + r1.runInAsyncScope(common.mustCall(() => { + asyncLocal.value = 'bar'; + assert.strictEqual(act.length, 2); + + r2.runInAsyncScope(common.mustCall(() => { + assert.strictEqual(act.length, 3); + + setImmediate(common.mustCall(() => { + assert.strictEqual(act.length, 7); + })); + })); + + setImmediate(common.mustCall(() => { + assert.strictEqual(act.length, 9); + assert.deepStrictEqual(act, exp); + })); + assert.strictEqual(act.length, 4); + })); + })); +} + +{ + const exp = [ + [ undefined, 'A', false ], + [ 'A', 'B', false ], + [ 'B', 'A', true ], + [ 'A', 'B', true ], + [ 'B', 'A', true ], + ]; + + const act = []; + const asyncLocal = new AsyncLocal({ + onChangedCb: (p, c, t) => act.push([p, c, t]) + }); + + asyncLocal.value = 'A'; + const asyncRes1 = new AsyncResource('Resource1'); + const asyncRes2 = new AsyncResource('Resource2'); + assert.strictEqual(act.length, 1); + + asyncRes1.runInAsyncScope(common.mustCall(() => { + assert.strictEqual(asyncLocal.value, 'A'); + asyncRes1.runInAsyncScope(common.mustCall(() => { + assert.strictEqual(asyncLocal.value, 'A'); + asyncRes2.runInAsyncScope(common.mustCall(() => { + assert.strictEqual(asyncLocal.value, 'A'); + asyncLocal.value = 'B'; + assert.strictEqual(act.length, 2); + asyncRes1.runInAsyncScope(common.mustCall(() => { + assert.strictEqual(asyncLocal.value, 'A'); + assert.strictEqual(act.length, 3); + })); + assert.strictEqual(act.length, 4); + assert.strictEqual(asyncLocal.value, 'B'); + })); + assert.strictEqual(act.length, 5); + assert.strictEqual(asyncLocal.value, 'A'); + })); + assert.strictEqual(asyncLocal.value, 'A'); + })); + + assert.strictEqual(act.length, 5); + + assert.deepStrictEqual(act, exp); +} + +{ + let asyncLocal; + + function onChangedCb() { + common.expectsError( + () => asyncLocal.value = 'bar', + { + code: 'ERR_ASYNCLOCAL_NO_RECURSION', + type: Error, + message: 'Setting value from onChanged callback is not allowed' + } + ); + } + + asyncLocal = new AsyncLocal({ + onChangedCb: common.mustCall(onChangedCb, 2) + }); + + asyncLocal.value = 'foo'; + asyncLocal.value = undefined; +}