diff --git a/lib/events.js b/lib/events.js index 4d8b5bbb1e6f3f..f9bec9a8acf4f3 100644 --- a/lib/events.js +++ b/lib/events.js @@ -22,44 +22,23 @@ 'use strict'; const { - ArrayPrototypeJoin, ArrayPrototypePop, ArrayPrototypePush, - ArrayPrototypeSlice, - ArrayPrototypeSplice, - ArrayPrototypeUnshift, - Boolean, Error, - ErrorCaptureStackTrace, - FunctionPrototypeBind, - FunctionPrototypeCall, NumberMAX_SAFE_INTEGER, - ObjectDefineProperties, - ObjectDefineProperty, ObjectGetPrototypeOf, ObjectSetPrototypeOf, Promise, PromiseReject, PromiseResolve, ReflectApply, - ReflectOwnKeys, - String, - StringPrototypeSplit, - Symbol, SymbolAsyncIterator, SymbolDispose, - SymbolFor, } = primordials; -const kRejection = SymbolFor('nodejs.rejection'); const { kEmptyObject } = require('internal/util'); -const { - inspect, - identicalSequenceRange, -} = require('internal/util/inspect'); -let spliceOne; let FixedQueue; let kFirstEventParam; let kResistStopPropagation; @@ -68,852 +47,30 @@ const { AbortError, codes: { ERR_INVALID_ARG_TYPE, - ERR_INVALID_THIS, - ERR_UNHANDLED_ERROR, }, - genericNodeError, - kEnhanceStackBeforeInspector, } = require('internal/errors'); const { validateInteger, validateAbortSignal, - validateBoolean, - validateFunction, - validateNumber, validateObject, - validateString, } = require('internal/validators'); const { addAbortListener } = require('internal/events/abort_listener'); +const { + kMaxEventTargetListeners, + kWatermarkData, +} = require('internal/events/symbols'); +const { + EventEmitter, + _getMaxListeners, +} = require('internal/events/event_emitter'); -const kCapture = Symbol('kCapture'); -const kErrorMonitor = Symbol('events.errorMonitor'); -const kShapeMode = Symbol('shapeMode'); -const kMaxEventTargetListeners = Symbol('events.maxEventTargetListeners'); -const kMaxEventTargetListenersWarned = - Symbol('events.maxEventTargetListenersWarned'); -const kWatermarkData = SymbolFor('nodejs.watermarkData'); - -let EventEmitterAsyncResource; -// The EventEmitterAsyncResource has to be initialized lazily because event.js -// is loaded so early in the bootstrap process, before async_hooks is available. -// -// This implementation was adapted straight from addaleax's -// eventemitter-asyncresource MIT-licensed userland module. -// https://github.com/addaleax/eventemitter-asyncresource -function lazyEventEmitterAsyncResource() { - if (EventEmitterAsyncResource === undefined) { - const { - AsyncResource, - } = require('async_hooks'); - - const kEventEmitter = Symbol('kEventEmitter'); - const kAsyncResource = Symbol('kAsyncResource'); - class EventEmitterReferencingAsyncResource extends AsyncResource { - /** - * @param {EventEmitter} ee - * @param {string} [type] - * @param {{ - * triggerAsyncId?: number, - * requireManualDestroy?: boolean, - * }} [options] - */ - constructor(ee, type, options) { - super(type, options); - this[kEventEmitter] = ee; - } - - /** - * @type {EventEmitter} - */ - get eventEmitter() { - if (this[kEventEmitter] === undefined) - throw new ERR_INVALID_THIS('EventEmitterReferencingAsyncResource'); - return this[kEventEmitter]; - } - } - - EventEmitterAsyncResource = - class EventEmitterAsyncResource extends EventEmitter { - /** - * @param {{ - * name?: string, - * triggerAsyncId?: number, - * requireManualDestroy?: boolean, - * }} [options] - */ - constructor(options = undefined) { - let name; - if (typeof options === 'string') { - name = options; - options = undefined; - } else { - if (new.target === EventEmitterAsyncResource) { - validateString(options?.name, 'options.name'); - } - name = options?.name || new.target.name; - } - super(options); - - this[kAsyncResource] = - new EventEmitterReferencingAsyncResource(this, name, options); - } - - /** - * @param {symbol,string} event - * @param {...any} args - * @returns {boolean} - */ - emit(event, ...args) { - if (this[kAsyncResource] === undefined) - throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); - const { asyncResource } = this; - ArrayPrototypeUnshift(args, super.emit, this, event); - return ReflectApply(asyncResource.runInAsyncScope, asyncResource, - args); - } - - /** - * @returns {void} - */ - emitDestroy() { - if (this[kAsyncResource] === undefined) - throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); - this.asyncResource.emitDestroy(); - } - - /** - * @type {number} - */ - get asyncId() { - if (this[kAsyncResource] === undefined) - throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); - return this.asyncResource.asyncId(); - } - - /** - * @type {number} - */ - get triggerAsyncId() { - if (this[kAsyncResource] === undefined) - throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); - return this.asyncResource.triggerAsyncId(); - } - - /** - * @type {EventEmitterReferencingAsyncResource} - */ - get asyncResource() { - if (this[kAsyncResource] === undefined) - throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); - return this[kAsyncResource]; - } - }; - } - return EventEmitterAsyncResource; -} - -/** - * Creates a new `EventEmitter` instance. - * @param {{ captureRejections?: boolean; }} [opts] - * @constructs {EventEmitter} - */ -function EventEmitter(opts) { - EventEmitter.init.call(this, opts); -} module.exports = EventEmitter; module.exports.addAbortListener = addAbortListener; module.exports.once = once; module.exports.on = on; module.exports.getEventListeners = getEventListeners; module.exports.getMaxListeners = getMaxListeners; -// Backwards-compat with node 0.10.x -EventEmitter.EventEmitter = EventEmitter; - -EventEmitter.usingDomains = false; - -EventEmitter.captureRejectionSymbol = kRejection; -ObjectDefineProperty(EventEmitter, 'captureRejections', { - __proto__: null, - get() { - return EventEmitter.prototype[kCapture]; - }, - set(value) { - validateBoolean(value, 'EventEmitter.captureRejections'); - - EventEmitter.prototype[kCapture] = value; - }, - enumerable: true, -}); - -ObjectDefineProperty(EventEmitter, 'EventEmitterAsyncResource', { - __proto__: null, - enumerable: true, - get: lazyEventEmitterAsyncResource, - set: undefined, - configurable: true, -}); - -EventEmitter.errorMonitor = kErrorMonitor; - -// The default for captureRejections is false -ObjectDefineProperty(EventEmitter.prototype, kCapture, { - __proto__: null, - value: false, - writable: true, - enumerable: false, -}); - -EventEmitter.prototype._events = undefined; -EventEmitter.prototype._eventsCount = 0; -EventEmitter.prototype._maxListeners = undefined; - -// By default EventEmitters will print a warning if more than 10 listeners are -// added to it. This is a useful default which helps finding memory leaks. -let defaultMaxListeners = 10; -let isEventTarget; - -function checkListener(listener) { - validateFunction(listener, 'listener'); -} - -ObjectDefineProperty(EventEmitter, 'defaultMaxListeners', { - __proto__: null, - enumerable: true, - get: function() { - return defaultMaxListeners; - }, - set: function(arg) { - validateNumber(arg, 'defaultMaxListeners', 0); - defaultMaxListeners = arg; - }, -}); - -ObjectDefineProperties(EventEmitter, { - kMaxEventTargetListeners: { - __proto__: null, - value: kMaxEventTargetListeners, - enumerable: false, - configurable: false, - writable: false, - }, - kMaxEventTargetListenersWarned: { - __proto__: null, - value: kMaxEventTargetListenersWarned, - enumerable: false, - configurable: false, - writable: false, - }, -}); - -/** - * Sets the max listeners. - * @param {number} n - * @param {EventTarget[] | EventEmitter[]} [eventTargets] - * @returns {void} - */ -EventEmitter.setMaxListeners = - function(n = defaultMaxListeners, ...eventTargets) { - validateNumber(n, 'setMaxListeners', 0); - if (eventTargets.length === 0) { - defaultMaxListeners = n; - } else { - if (isEventTarget === undefined) - isEventTarget = require('internal/event_target').isEventTarget; - - for (let i = 0; i < eventTargets.length; i++) { - const target = eventTargets[i]; - if (isEventTarget(target)) { - target[kMaxEventTargetListeners] = n; - target[kMaxEventTargetListenersWarned] = false; - } else if (typeof target.setMaxListeners === 'function') { - target.setMaxListeners(n); - } else { - throw new ERR_INVALID_ARG_TYPE( - 'eventTargets', - ['EventEmitter', 'EventTarget'], - target); - } - } - } - }; - -// If you're updating this function definition, please also update any -// re-definitions, such as the one in the Domain module (lib/domain.js). -EventEmitter.init = function(opts) { - - if (this._events === undefined || - this._events === ObjectGetPrototypeOf(this)._events) { - this._events = { __proto__: null }; - this._eventsCount = 0; - this[kShapeMode] = false; - } else { - this[kShapeMode] = true; - } - - this._maxListeners = this._maxListeners || undefined; - - - if (opts?.captureRejections) { - validateBoolean(opts.captureRejections, 'options.captureRejections'); - this[kCapture] = Boolean(opts.captureRejections); - } else { - // Assigning the kCapture property directly saves an expensive - // prototype lookup in a very sensitive hot path. - this[kCapture] = EventEmitter.prototype[kCapture]; - } -}; - -function addCatch(that, promise, type, args) { - if (!that[kCapture]) { - return; - } - - // Handle Promises/A+ spec, then could be a getter - // that throws on second use. - try { - const then = promise.then; - - if (typeof then === 'function') { - then.call(promise, undefined, function(err) { - // The callback is called with nextTick to avoid a follow-up - // rejection from this promise. - process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args); - }); - } - } catch (err) { - that.emit('error', err); - } -} - -function emitUnhandledRejectionOrErr(ee, err, type, args) { - if (typeof ee[kRejection] === 'function') { - ee[kRejection](err, type, ...args); - } else { - // We have to disable the capture rejections mechanism, otherwise - // we might end up in an infinite loop. - const prev = ee[kCapture]; - - // If the error handler throws, it is not catchable and it - // will end up in 'uncaughtException'. We restore the previous - // value of kCapture in case the uncaughtException is present - // and the exception is handled. - try { - ee[kCapture] = false; - ee.emit('error', err); - } finally { - ee[kCapture] = prev; - } - } -} - -/** - * Increases the max listeners of the event emitter. - * @param {number} n - * @returns {EventEmitter} - */ -EventEmitter.prototype.setMaxListeners = function setMaxListeners(n) { - validateNumber(n, 'setMaxListeners', 0); - this._maxListeners = n; - return this; -}; - -function _getMaxListeners(that) { - if (that._maxListeners === undefined) - return EventEmitter.defaultMaxListeners; - return that._maxListeners; -} - -/** - * Returns the current max listener value for the event emitter. - * @returns {number} - */ -EventEmitter.prototype.getMaxListeners = function getMaxListeners() { - return _getMaxListeners(this); -}; - -function enhanceStackTrace(err, own) { - let ctorInfo = ''; - try { - const { name } = this.constructor; - if (name !== 'EventEmitter') - ctorInfo = ` on ${name} instance`; - } catch { - // Continue regardless of error. - } - const sep = `\nEmitted 'error' event${ctorInfo} at:\n`; - - const errStack = ArrayPrototypeSlice( - StringPrototypeSplit(err.stack, '\n'), 1); - const ownStack = ArrayPrototypeSlice( - StringPrototypeSplit(own.stack, '\n'), 1); - - const { len, offset } = identicalSequenceRange(ownStack, errStack); - if (len > 0) { - ArrayPrototypeSplice(ownStack, offset + 1, len - 2, - ' [... lines matching original stack trace ...]'); - } - - return err.stack + sep + ArrayPrototypeJoin(ownStack, '\n'); -} - -/** - * Synchronously calls each of the listeners registered - * for the event. - * @param {string | symbol} type - * @param {...any} [args] - * @returns {boolean} - */ -EventEmitter.prototype.emit = function emit(type, ...args) { - let doError = (type === 'error'); - - const events = this._events; - if (events !== undefined) { - if (doError && events[kErrorMonitor] !== undefined) - this.emit(kErrorMonitor, ...args); - doError = (doError && events.error === undefined); - } else if (!doError) - return false; - - // If there is no 'error' event listener then throw. - if (doError) { - let er; - if (args.length > 0) - er = args[0]; - if (er instanceof Error) { - try { - const capture = {}; - ErrorCaptureStackTrace(capture, EventEmitter.prototype.emit); - ObjectDefineProperty(er, kEnhanceStackBeforeInspector, { - __proto__: null, - value: FunctionPrototypeBind(enhanceStackTrace, this, er, capture), - configurable: true, - }); - } catch { - // Continue regardless of error. - } - - // Note: The comments on the `throw` lines are intentional, they show - // up in Node's output if this results in an unhandled exception. - throw er; // Unhandled 'error' event - } - - let stringifiedEr; - try { - stringifiedEr = inspect(er); - } catch { - stringifiedEr = er; - } - - // At least give some kind of context to the user - const err = new ERR_UNHANDLED_ERROR(stringifiedEr); - err.context = er; - throw err; // Unhandled 'error' event - } - - const handler = events[type]; - - if (handler === undefined) - return false; - - if (typeof handler === 'function') { - const result = handler.apply(this, args); - - // We check if result is undefined first because that - // is the most common case so we do not pay any perf - // penalty - if (result !== undefined && result !== null) { - addCatch(this, result, type, args); - } - } else { - const len = handler.length; - const listeners = arrayClone(handler); - for (let i = 0; i < len; ++i) { - const result = listeners[i].apply(this, args); - - // We check if result is undefined first because that - // is the most common case so we do not pay any perf - // penalty. - // This code is duplicated because extracting it away - // would make it non-inlineable. - if (result !== undefined && result !== null) { - addCatch(this, result, type, args); - } - } - } - - return true; -}; - -function _addListener(target, type, listener, prepend) { - let m; - let events; - let existing; - - checkListener(listener); - - events = target._events; - if (events === undefined) { - events = target._events = { __proto__: null }; - target._eventsCount = 0; - } else { - // To avoid recursion in the case that type === "newListener"! Before - // adding it to the listeners, first emit "newListener". - if (events.newListener !== undefined) { - target.emit('newListener', type, - listener.listener ?? listener); - - // Re-assign `events` because a newListener handler could have caused the - // this._events to be assigned to a new object - events = target._events; - } - existing = events[type]; - } - - if (existing === undefined) { - // Optimize the case of one listener. Don't need the extra array object. - events[type] = listener; - ++target._eventsCount; - } else { - if (typeof existing === 'function') { - // Adding the second element, need to change to array. - existing = events[type] = - prepend ? [listener, existing] : [existing, listener]; - // If we've already got an array, just append. - } else if (prepend) { - existing.unshift(listener); - } else { - existing.push(listener); - } - - // Check for listener leak - m = _getMaxListeners(target); - if (m > 0 && existing.length > m && !existing.warned) { - existing.warned = true; - // No error code for this since it is a Warning - const w = genericNodeError( - `Possible EventEmitter memory leak detected. ${existing.length} ${String(type)} listeners ` + - `added to ${inspect(target, { depth: -1 })}. MaxListeners is ${m}. Use emitter.setMaxListeners() to increase limit`, - { name: 'MaxListenersExceededWarning', emitter: target, type: type, count: existing.length }); - process.emitWarning(w); - } - } - - return target; -} - -/** - * Adds a listener to the event emitter. - * @param {string | symbol} type - * @param {Function} listener - * @returns {EventEmitter} - */ -EventEmitter.prototype.addListener = function addListener(type, listener) { - return _addListener(this, type, listener, false); -}; - -EventEmitter.prototype.on = EventEmitter.prototype.addListener; - -/** - * Adds the `listener` function to the beginning of - * the listeners array. - * @param {string | symbol} type - * @param {Function} listener - * @returns {EventEmitter} - */ -EventEmitter.prototype.prependListener = - function prependListener(type, listener) { - return _addListener(this, type, listener, true); - }; - -function onceWrapper() { - if (!this.fired) { - this.target.removeListener(this.type, this.wrapFn); - this.fired = true; - if (arguments.length === 0) - return this.listener.call(this.target); - return this.listener.apply(this.target, arguments); - } -} - -function _onceWrap(target, type, listener) { - const state = { fired: false, wrapFn: undefined, target, type, listener }; - const wrapped = onceWrapper.bind(state); - wrapped.listener = listener; - state.wrapFn = wrapped; - return wrapped; -} - -/** - * Adds a one-time `listener` function to the event emitter. - * @param {string | symbol} type - * @param {Function} listener - * @returns {EventEmitter} - */ -EventEmitter.prototype.once = function once(type, listener) { - checkListener(listener); - - this.on(type, _onceWrap(this, type, listener)); - return this; -}; - -/** - * Adds a one-time `listener` function to the beginning of - * the listeners array. - * @param {string | symbol} type - * @param {Function} listener - * @returns {EventEmitter} - */ -EventEmitter.prototype.prependOnceListener = - function prependOnceListener(type, listener) { - checkListener(listener); - - this.prependListener(type, _onceWrap(this, type, listener)); - return this; - }; - -/** - * Removes the specified `listener` from the listeners array. - * @param {string | symbol} type - * @param {Function} listener - * @returns {EventEmitter} - */ -EventEmitter.prototype.removeListener = - function removeListener(type, listener) { - checkListener(listener); - - const events = this._events; - if (events === undefined) - return this; - - const list = events[type]; - if (list === undefined) - return this; - - if (list === listener || list.listener === listener) { - this._eventsCount -= 1; - - if (this[kShapeMode]) { - events[type] = undefined; - } else if (this._eventsCount === 0) { - this._events = { __proto__: null }; - } else { - delete events[type]; - if (events.removeListener) - this.emit('removeListener', type, list.listener || listener); - } - } else if (typeof list !== 'function') { - let position = -1; - - for (let i = list.length - 1; i >= 0; i--) { - if (list[i] === listener || list[i].listener === listener) { - position = i; - break; - } - } - - if (position < 0) - return this; - - if (position === 0) - list.shift(); - else { - if (spliceOne === undefined) - spliceOne = require('internal/util').spliceOne; - spliceOne(list, position); - } - - if (list.length === 1) - events[type] = list[0]; - - if (events.removeListener !== undefined) - this.emit('removeListener', type, listener); - } - - return this; - }; - -EventEmitter.prototype.off = EventEmitter.prototype.removeListener; - -/** - * Removes all listeners from the event emitter. (Only - * removes listeners for a specific event name if specified - * as `type`). - * @param {string | symbol} [type] - * @returns {EventEmitter} - */ -EventEmitter.prototype.removeAllListeners = - function removeAllListeners(type) { - const events = this._events; - if (events === undefined) - return this; - - // Not listening for removeListener, no need to emit - if (events.removeListener === undefined) { - if (arguments.length === 0) { - this._events = { __proto__: null }; - this._eventsCount = 0; - } else if (events[type] !== undefined) { - if (--this._eventsCount === 0) - this._events = { __proto__: null }; - else - delete events[type]; - } - this[kShapeMode] = false; - return this; - } - - // Emit removeListener for all listeners on all events - if (arguments.length === 0) { - for (const key of ReflectOwnKeys(events)) { - if (key === 'removeListener') continue; - this.removeAllListeners(key); - } - this.removeAllListeners('removeListener'); - this._events = { __proto__: null }; - this._eventsCount = 0; - this[kShapeMode] = false; - return this; - } - - const listeners = events[type]; - - if (typeof listeners === 'function') { - this.removeListener(type, listeners); - } else if (listeners !== undefined) { - // LIFO order - for (let i = listeners.length - 1; i >= 0; i--) { - this.removeListener(type, listeners[i]); - } - } - - return this; - }; - -function _listeners(target, type, unwrap) { - const events = target._events; - - if (events === undefined) - return []; - - const evlistener = events[type]; - if (evlistener === undefined) - return []; - - if (typeof evlistener === 'function') - return unwrap ? [evlistener.listener || evlistener] : [evlistener]; - - return unwrap ? - unwrapListeners(evlistener) : arrayClone(evlistener); -} - -/** - * Returns a copy of the array of listeners for the event name - * specified as `type`. - * @param {string | symbol} type - * @returns {Function[]} - */ -EventEmitter.prototype.listeners = function listeners(type) { - return _listeners(this, type, true); -}; - -/** - * Returns a copy of the array of listeners and wrappers for - * the event name specified as `type`. - * @param {string | symbol} type - * @returns {Function[]} - */ -EventEmitter.prototype.rawListeners = function rawListeners(type) { - return _listeners(this, type, false); -}; - -/** - * Returns the number of listeners listening to the event name - * specified as `type`. - * @deprecated since v3.2.0 - * @param {EventEmitter} emitter - * @param {string | symbol} type - * @returns {number} - */ -EventEmitter.listenerCount = function(emitter, type) { - if (typeof emitter.listenerCount === 'function') { - return emitter.listenerCount(type); - } - return FunctionPrototypeCall(listenerCount, emitter, type); -}; - -EventEmitter.prototype.listenerCount = listenerCount; - -/** - * Returns the number of listeners listening to event name - * specified as `type`. - * @param {string | symbol} type - * @param {Function} listener - * @returns {number} - */ -function listenerCount(type, listener) { - const events = this._events; - - if (events !== undefined) { - const evlistener = events[type]; - - if (typeof evlistener === 'function') { - if (listener != null) { - return listener === evlistener || listener === evlistener.listener ? 1 : 0; - } - - return 1; - } else if (evlistener !== undefined) { - if (listener != null) { - let matching = 0; - - for (let i = 0, l = evlistener.length; i < l; i++) { - if (evlistener[i] === listener || evlistener[i].listener === listener) { - matching++; - } - } - - return matching; - } - - return evlistener.length; - } - } - - return 0; -} - -/** - * Returns an array listing the events for which - * the emitter has registered listeners. - * @returns {any[]} - */ -EventEmitter.prototype.eventNames = function eventNames() { - return this._eventsCount > 0 ? ReflectOwnKeys(this._events) : []; -}; - -function arrayClone(arr) { - // At least since V8 8.3, this implementation is faster than the previous - // which always used a simple for-loop - switch (arr.length) { - case 2: return [arr[0], arr[1]]; - case 3: return [arr[0], arr[1], arr[2]]; - case 4: return [arr[0], arr[1], arr[2], arr[3]]; - case 5: return [arr[0], arr[1], arr[2], arr[3], arr[4]]; - case 6: return [arr[0], arr[1], arr[2], arr[3], arr[4], arr[5]]; - } - return ArrayPrototypeSlice(arr); -} - -function unwrapListeners(arr) { - const ret = arrayClone(arr); - for (let i = 0; i < ret.length; ++i) { - const orig = ret[i].listener; - if (typeof orig === 'function') - ret[i] = orig; - } - return ret; -} /** * Returns a copy of the array of listeners for the event name diff --git a/lib/internal/events/event_emitter.js b/lib/internal/events/event_emitter.js new file mode 100644 index 00000000000000..8bf856cb0c0fa6 --- /dev/null +++ b/lib/internal/events/event_emitter.js @@ -0,0 +1,700 @@ +'use strict'; + +const { + ArrayPrototypeUnshift, + Boolean, + FunctionPrototypeCall, + ObjectDefineProperties, + ObjectDefineProperty, + ObjectGetPrototypeOf, + ReflectApply, + ReflectOwnKeys, + Symbol, +} = primordials; + +const { + codes: { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_THIS, + }, +} = require('internal/errors'); + +const { + validateBoolean, + validateFunction, + validateNumber, + validateString, +} = require('internal/validators'); +const { + kCapture, + kErrorMonitor, + kShapeMode, + kMaxEventTargetListeners, + kMaxEventTargetListenersWarned, + kImpl, + kIsFastPath, + kSwitchToSlowPath, + kInitialEvents, + kRejection, +} = require('internal/events/symbols'); +const { + arrayClone, + throwErrorOnMissingErrorHandler, +} = require('internal/events/shared_internal_event_emitter'); +const { SlowEventEmitter } = require('internal/events/slow_event_emitter'); +const { FastEventEmitter, isEventUnsupportedForFastPath } = require('internal/events/fast_event_emitter'); + + +let EventEmitterAsyncResource; +// The EventEmitterAsyncResource has to be initialized lazily because event.js +// is loaded so early in the bootstrap process, before async_hooks is available. +// +// This implementation was adapted straight from addaleax's +// eventemitter-asyncresource MIT-licensed userland module. +// https://github.com/addaleax/eventemitter-asyncresource +function lazyEventEmitterAsyncResource() { + if (EventEmitterAsyncResource === undefined) { + const { + AsyncResource, + } = require('async_hooks'); + + const kEventEmitter = Symbol('kEventEmitter'); + const kAsyncResource = Symbol('kAsyncResource'); + class EventEmitterReferencingAsyncResource extends AsyncResource { + /** + * @param {EventEmitter} ee + * @param {string} [type] + * @param {{ + * triggerAsyncId?: number, + * requireManualDestroy?: boolean, + * }} [options] + */ + constructor(ee, type, options) { + super(type, options); + this[kEventEmitter] = ee; + } + + /** + * @type {EventEmitter} + */ + get eventEmitter() { + if (this[kEventEmitter] === undefined) + throw new ERR_INVALID_THIS('EventEmitterReferencingAsyncResource'); + return this[kEventEmitter]; + } + } + + EventEmitterAsyncResource = + class EventEmitterAsyncResource extends EventEmitter { + /** + * @param {{ + * name?: string, + * triggerAsyncId?: number, + * requireManualDestroy?: boolean, + * }} [options] + */ + constructor(options = undefined) { + let name; + if (typeof options === 'string') { + name = options; + options = undefined; + } else { + if (new.target === EventEmitterAsyncResource) { + validateString(options?.name, 'options.name'); + } + name = options?.name || new.target.name; + } + super(options); + + this[kAsyncResource] = + new EventEmitterReferencingAsyncResource(this, name, options); + } + + /** + * @param {symbol,string} event + * @param {...any} args + * @returns {boolean} + */ + emit(event, ...args) { + if (this[kAsyncResource] === undefined) + throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); + const { asyncResource } = this; + ArrayPrototypeUnshift(args, super.emit, this, event); + return ReflectApply(asyncResource.runInAsyncScope, asyncResource, + args); + } + + /** + * @returns {void} + */ + emitDestroy() { + if (this[kAsyncResource] === undefined) + throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); + this.asyncResource.emitDestroy(); + } + + /** + * @type {number} + */ + get asyncId() { + if (this[kAsyncResource] === undefined) + throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); + return this.asyncResource.asyncId(); + } + + /** + * @type {number} + */ + get triggerAsyncId() { + if (this[kAsyncResource] === undefined) + throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); + return this.asyncResource.triggerAsyncId(); + } + + /** + * @type {EventEmitterReferencingAsyncResource} + */ + get asyncResource() { + if (this[kAsyncResource] === undefined) + throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); + return this[kAsyncResource]; + } + }; + } + return EventEmitterAsyncResource; +} + +/** + * Creates a new `EventEmitter` instance. + * @param {{ captureRejections?: boolean; }} [opts] + * @constructs {EventEmitter} + */ +function EventEmitter(opts) { + EventEmitter.init.call(this, opts); +} + +module.exports = { + EventEmitter, + _getMaxListeners, +}; +// Backwards-compat with node 0.10.x +EventEmitter.EventEmitter = EventEmitter; + +EventEmitter.usingDomains = false; + +EventEmitter.captureRejectionSymbol = kRejection; +ObjectDefineProperty(EventEmitter, 'captureRejections', { + __proto__: null, + get() { + return EventEmitter.prototype[kCapture]; + }, + set(value) { + validateBoolean(value, 'EventEmitter.captureRejections'); + + EventEmitter.prototype[kCapture] = value; + }, + enumerable: true, +}); + +ObjectDefineProperty(EventEmitter, 'EventEmitterAsyncResource', { + __proto__: null, + enumerable: true, + get: lazyEventEmitterAsyncResource, + set: undefined, + configurable: true, +}); + +EventEmitter.errorMonitor = kErrorMonitor; + +// The default for captureRejections is false +ObjectDefineProperty(EventEmitter.prototype, kCapture, { + __proto__: null, + value: false, + writable: true, + enumerable: false, +}); + +ObjectDefineProperties(EventEmitter.prototype, { + [kImpl]: { + __proto__: null, + value: undefined, + enumerable: false, + configurable: false, + writable: true, + }, + [kIsFastPath]: { + __proto__: null, + value: undefined, + enumerable: false, + configurable: false, + writable: true, + }, + [kInitialEvents]: { + __proto__: null, + value: undefined, + enumerable: false, + configurable: false, + writable: true, + }, + _events: { + __proto__: null, + enumerable: true, + get: function() { + if(this[kImpl] === undefined) { + return undefined; + } + return this[kImpl]._events; + }, + set: function(arg) { + // If using the _events setter move to slow path to avoid bugs with incorrect shape or functions + // Users should not interact with _events directly + EventEmitter.prototype[kSwitchToSlowPath].call(this); + this[kImpl]._events = arg; + }, + }, + _eventsCount: { + __proto__: null, + enumerable: true, + + get: function() { + if (this[kImpl] === undefined) { + return 0; + } + + return this[kImpl]._eventsCount; + }, + set: function(arg) { + if (this[kImpl] === undefined) { + return; + } + + this[kImpl]._eventsCount = arg; + }, + }, +}); + +EventEmitter.prototype._maxListeners = undefined; + +// By default EventEmitters will print a warning if more than 10 listeners are +// added to it. This is a useful default which helps finding memory leaks. +let defaultMaxListeners = 10; +let isEventTarget; + +function checkListener(listener) { + validateFunction(listener, 'listener'); +} + +ObjectDefineProperty(EventEmitter, 'defaultMaxListeners', { + __proto__: null, + enumerable: true, + get: function() { + return defaultMaxListeners; + }, + set: function(arg) { + validateNumber(arg, 'defaultMaxListeners', 0); + defaultMaxListeners = arg; + }, +}); + +ObjectDefineProperties(EventEmitter, { + kMaxEventTargetListeners: { + __proto__: null, + value: kMaxEventTargetListeners, + enumerable: false, + configurable: false, + writable: false, + }, + kMaxEventTargetListenersWarned: { + __proto__: null, + value: kMaxEventTargetListenersWarned, + enumerable: false, + configurable: false, + writable: false, + }, +}); + +/** + * Sets the max listeners. + * @param {number} n + * @param {EventTarget[] | EventEmitter[]} [eventTargets] + * @returns {void} + */ +EventEmitter.setMaxListeners = + function(n = defaultMaxListeners, ...eventTargets) { + validateNumber(n, 'setMaxListeners', 0); + if (eventTargets.length === 0) { + defaultMaxListeners = n; + } else { + if (isEventTarget === undefined) + isEventTarget = require('internal/event_target').isEventTarget; + + for (let i = 0; i < eventTargets.length; i++) { + const target = eventTargets[i]; + if (isEventTarget(target)) { + target[kMaxEventTargetListeners] = n; + target[kMaxEventTargetListenersWarned] = false; + } else if (typeof target.setMaxListeners === 'function') { + target.setMaxListeners(n); + } else { + throw new ERR_INVALID_ARG_TYPE( + 'eventTargets', + ['EventEmitter', 'EventTarget'], + target); + } + } + } + }; + +// If you're updating this function definition, please also update any +// re-definitions, such as the one in the Domain module (lib/domain.js). +EventEmitter.init = function(opts) { + let thisPrototype; + + this._maxListeners = this._maxListeners || undefined; + + if (opts?.captureRejections) { + validateBoolean(opts.captureRejections, 'options.captureRejections'); + this[kCapture] = Boolean(opts.captureRejections); + } else { + // Assigning the kCapture property directly saves an expensive + // prototype lookup in a very sensitive hot path. + this[kCapture] = EventEmitter.prototype[kCapture]; + } + if(this[kImpl] !== undefined) { + return; + } + + if (this[kImpl] === undefined || (thisPrototype = ObjectGetPrototypeOf(this))[kImpl] === this[kImpl]) { + this[kImpl] = new FastEventEmitter(this); + this[kIsFastPath] = true; + } + + const thisEvents = this._events; + const impl = this[kImpl]; + const missingEvents = thisEvents === undefined; + + if (missingEvents && this[kInitialEvents] !== undefined) { + impl._events = this[kInitialEvents]; + this[kInitialEvents] = undefined; + impl[kShapeMode] = true; + } else if (missingEvents || + thisEvents === (thisPrototype || ObjectGetPrototypeOf(this))._events) { + // In fast path we don't want to have __proto__: null as it will cause the object to be in dictionary mode + // and will slow down the access to the properties by a lot (around 2x) + impl._events = this[kIsFastPath] ? {} : { __proto__: null }; + impl._eventsCount = 0; + impl[kShapeMode] = false; + } +}; + +EventEmitter.prototype[kSwitchToSlowPath] = function() { + if (this[kIsFastPath] === false) { + return; + } + + if (this[kIsFastPath] === true) { + this[kIsFastPath] = false; + this[kImpl] = SlowEventEmitter.fromFastEventEmitter(this[kImpl]); + return; + } + + this[kImpl] = new SlowEventEmitter(this); + this[kIsFastPath] = false; + +}; + +/** + * Increases the max listeners of the event emitter. + * @param {number} n + * @returns {EventEmitter} + */ +EventEmitter.prototype.setMaxListeners = function setMaxListeners(n) { + validateNumber(n, 'setMaxListeners', 0); + this._maxListeners = n; + return this; +}; + +function _getMaxListeners(that) { + if (that._maxListeners === undefined) + return EventEmitter.defaultMaxListeners; + return that._maxListeners; +} +/** + * Returns the current max listener value for the event emitter. + * @returns {number} + */ +EventEmitter.prototype.getMaxListeners = function getMaxListeners() { + return _getMaxListeners(this); +}; + + +/** + * Synchronously calls each of the listeners registered + * for the event. + * @param {string | symbol} type + * @param {...any} [args] + * @returns {boolean} + */ +EventEmitter.prototype.emit = function emit(type, ...args) { + // Users can call emit in the constructor before even calling super causing this[kImpl] to be undefined + const impl = this[kImpl]; + + // The order here is important as impl === undefined is slower than type === 'error' + if (type === 'error' && impl === undefined) { + throwErrorOnMissingErrorHandler.apply(this, args); + } + + // This can happen when calling emit for function that was not listened to and we are still in the fast path + // this is to avoid calling object prototype functions + if (this[kIsFastPath] === true && isEventUnsupportedForFastPath(type) === true) { + return false; + } + + return impl !== undefined ? impl.emit.apply(impl, arguments) : false; +}; + +/** + * Adds a listener to the event emitter. + * @param {string | symbol} type + * @param {Function} listener + * @returns {EventEmitter} + */ +EventEmitter.prototype.addListener = function addListener(type, listener) { + checkListener(listener); + + // This can happen in TLSSocket, where we listen for close event before + // the EventEmitter was initiated + if (this[kImpl] === undefined) { + EventEmitter.init.apply(this); + } + + // If the listener is already added, + // switch to slow path as the fast path optimized for single listener for each event + if (this[kIsFastPath] && (isEventUnsupportedForFastPath(type) || this[kImpl].isListenerAlreadyExists(type))) { + EventEmitter.prototype[kSwitchToSlowPath].call(this); + } + + this[kImpl].addListener(type, listener); + + return this; +}; + +EventEmitter.prototype.on = EventEmitter.prototype.addListener; + +/** + * Adds the `listener` function to the beginning of + * the listeners array. + * @param {string | symbol} type + * @param {Function} listener + * @returns {EventEmitter} + */ +EventEmitter.prototype.prependListener = + function prependListener(type, listener) { + checkListener(listener); + + // This can happen in TLSSocket, where we listen for close event before + // the EventEmitter was initiated + if (this[kImpl] === undefined) { + EventEmitter.init.apply(this); + } + + // If the listener is already added, + // switch to slow path as the fast path optimized for single listener for each event + if (this[kIsFastPath] && (isEventUnsupportedForFastPath(type) || this[kImpl].isListenerAlreadyExists(type))) { + EventEmitter.prototype[kSwitchToSlowPath].call(this); + } + + this[kImpl].addListener(type, listener, true); + + return this; + }; + +function onceWrapper() { + if (!this.fired) { + this.target.removeListener(this.type, this.wrapFn); + this.fired = true; + if (arguments.length === 0) + return this.listener.call(this.target); + return this.listener.apply(this.target, arguments); + } +} + +function _onceWrap(target, type, listener) { + const state = { fired: false, wrapFn: undefined, target, type, listener }; + const wrapped = onceWrapper.bind(state); + wrapped.listener = listener; + state.wrapFn = wrapped; + return wrapped; +} + +/** + * Adds a one-time `listener` function to the event emitter. + * @param {string | symbol} type + * @param {Function} listener + * @returns {EventEmitter} + */ +EventEmitter.prototype.once = function once(type, listener) { + checkListener(listener); + + this.on(type, _onceWrap(this, type, listener)); + + return this; +}; + +/** + * Adds a one-time `listener` function to the beginning of + * the listeners array. + * @param {string | symbol} type + * @param {Function} listener + * @returns {EventEmitter} + */ +EventEmitter.prototype.prependOnceListener = + function prependOnceListener(type, listener) { + checkListener(listener); + + this.prependListener(type, _onceWrap(this, type, listener)); + + return this; + }; + +/** + * Removes the specified `listener` from the listeners array. + * @param {string | symbol} type + * @param {Function} listener + * @returns {EventEmitter} + */ +EventEmitter.prototype.removeListener = + function removeListener(type, listener) { + checkListener(listener); + + this[kImpl].removeListener(type, listener); + + return this; + }; + +EventEmitter.prototype.off = EventEmitter.prototype.removeListener; + +/** + * Removes all listeners from the event emitter. (Only + * removes listeners for a specific event name if specified + * as `type`). + * @param {string | symbol} [type] + * @returns {EventEmitter} + */ +EventEmitter.prototype.removeAllListeners = + function removeAllListeners(type) { + this[kImpl].removeAllListeners.apply(this[kImpl], arguments); + return this; + }; + +function _listeners(target, type, unwrap) { + const events = target._events; + + if (events === undefined) + return []; + + const evlistener = events[type]; + if (evlistener === undefined) + return []; + + if (typeof evlistener === 'function') + return unwrap ? [evlistener.listener || evlistener] : [evlistener]; + + return unwrap ? + unwrapListeners(evlistener) : arrayClone(evlistener); +} + +/** + * Returns a copy of the array of listeners for the event name + * specified as `type`. + * @param {string | symbol} type + * @returns {Function[]} + */ +EventEmitter.prototype.listeners = function listeners(type) { + return _listeners(this, type, true); +}; + +/** + * Returns a copy of the array of listeners and wrappers for + * the event name specified as `type`. + * @param {string | symbol} type + * @returns {Function[]} + */ +EventEmitter.prototype.rawListeners = function rawListeners(type) { + return _listeners(this, type, false); +}; + +/** + * Returns the number of listeners listening to the event name + * specified as `type`. + * @deprecated since v3.2.0 + * @param {EventEmitter} emitter + * @param {string | symbol} type + * @returns {number} + */ +EventEmitter.listenerCount = function(emitter, type) { + if (typeof emitter.listenerCount === 'function') { + return emitter.listenerCount(type); + } + return FunctionPrototypeCall(listenerCount, emitter, type); +}; + +EventEmitter.prototype.listenerCount = listenerCount; + +/** + * Returns the number of listeners listening to event name + * specified as `type`. + * @param {string | symbol} type + * @param {Function} listener + * @returns {number} + */ +function listenerCount(type, listener) { + const events = this._events; + + if (events !== undefined) { + const evlistener = events[type]; + + if (typeof evlistener === 'function') { + if (listener != null) { + return listener === evlistener || listener === evlistener.listener ? 1 : 0; + } + + return 1; + } else if (evlistener !== undefined) { + if (listener != null) { + let matching = 0; + + for (let i = 0, l = evlistener.length; i < l; i++) { + if (evlistener[i] === listener || evlistener[i].listener === listener) { + matching++; + } + } + + return matching; + } + + return evlistener.length; + } + } + + return 0; +} + +/** + * Returns an array listing the events for which + * the emitter has registered listeners. + * @returns {any[]} + */ +EventEmitter.prototype.eventNames = function eventNames() { + return this._eventsCount > 0 ? ReflectOwnKeys(this._events) : []; +}; + +function unwrapListeners(arr) { + const ret = arrayClone(arr); + for (let i = 0; i < ret.length; ++i) { + const orig = ret[i].listener; + if (typeof orig === 'function') + ret[i] = orig; + } + return ret; +} diff --git a/lib/internal/events/fast_event_emitter.js b/lib/internal/events/fast_event_emitter.js new file mode 100644 index 00000000000000..909d3a79e55468 --- /dev/null +++ b/lib/internal/events/fast_event_emitter.js @@ -0,0 +1,169 @@ +'use strict'; + +const { kShapeMode, kErrorMonitor } = require('internal/events/symbols'); +const { + throwErrorOnMissingErrorHandler, + addCatch, +} = require('internal/events/shared_internal_event_emitter'); + +/** + * This class is optimized for the case where there is only a single listener for each event. + * it supports kCapture + * but does not support event types that {@link isEventUnsupportedForFastPath} returns false for + */ +function FastEventEmitter(eventEmitterTranslationLayer, _events) { + this.eventEmitterTranslationLayer = eventEmitterTranslationLayer; + this._events = _events; +} + +/** + * The events are stored here as Record + */ +FastEventEmitter.prototype._events = undefined; +FastEventEmitter.prototype[kShapeMode] = undefined; +FastEventEmitter.prototype._eventsCount = 0; +FastEventEmitter.prototype.eventEmitterTranslationLayer = undefined; + +/** + * Synchronously calls each of the listeners registered + * for the event. + * @param {string | symbol} type + * @param {...any} [args] + * @returns {boolean} + */ +FastEventEmitter.prototype.emit = function emit(type, ...args) { + const events = this._events; + + if (type === 'error' && events?.error === undefined) { + throwErrorOnMissingErrorHandler.apply(this.eventEmitterTranslationLayer, args); + } + + const handler = events[type]; + + if (handler === undefined) { + return false; + } + + const result = handler.apply(this.eventEmitterTranslationLayer, args); + + // We check if result is undefined first because that + // is the most common case so we do not pay any perf + // penalty + if (result !== undefined && result !== null) { + addCatch(this.eventEmitterTranslationLayer, result, type, args); + } + + return true; +}; + +FastEventEmitter.prototype.isListenerAlreadyExists = function isListenerAlreadyExists(type) { + return this._events?.[type] !== undefined; +}; + +/** + * Adds a listener to the event emitter. + * @param {string | symbol} type + * @param {Function} listener + * @param {boolean} prepend not used here as we are in fast mode and only have single listener + */ +FastEventEmitter.prototype.addListener = function addListener(type, listener, prepend = undefined) { + let events; + + events = this._events; + if (events === undefined) { + events = this._events = {}; + this._eventsCount = 0; + + // Not emitting `newListener` here as in fast path we don't have it + } + + // Optimize the case of one listener. Don't need the extra array object. + events[type] = listener; + ++this._eventsCount; +}; + +/** + * Removes the specified `listener`. + * @param {string | symbol} type + * @param {Function} listener + */ +FastEventEmitter.prototype.removeListener = function removeListener(type, listener) { + const events = this._events; + if (events === undefined) + return undefined; + + const list = events[type]; + if (list === undefined || (list !== listener && list.listener !== listener)) + return undefined; + + this._eventsCount -= 1; + + if (this[kShapeMode]) { + events[type] = undefined; + } else if (this._eventsCount === 0) { + this._events = { }; + } else { + delete events[type]; + // Not emitting `removeListener` here as in fast path we don't have it + } +}; + + +/** + * Removes all listeners from the event emitter. (Only + * removes listeners for a specific event name if specified + * as `type`). + * @param {string | symbol} [type] + */ +FastEventEmitter.prototype.removeAllListeners = function removeAllListeners(type) { + const events = this._events; + if (events === undefined) + return undefined; + + if (arguments.length === 0) { + this._events = { }; + this._eventsCount = 0; + } else if (events[type] !== undefined) { + if (--this._eventsCount === 0) + this._events = { }; + else + delete events[type]; + } + + this[kShapeMode] = false; + + return undefined; +}; + + +function isEventUnsupportedForFastPath(type) { + return ( + // Not supporting newListener and removeListener events in fast path + // as they can add new listeners in the middle of the process + type === 'newListener' || + type === 'removeListener' || + + // Not supporting errorMonitor event as it has special handling + type === kErrorMonitor || + + // Not supporting Object prototype keys as fast path object is not an object with null prototype + type === 'constructor' || + type === '__defineGetter__' || + type === '__defineSetter__' || + type === 'hasOwnProperty' || + type === '__lookupGetter__' || + type === '__lookupSetter__' || + type === 'isPrototypeOf' || + type === 'propertyIsEnumerable' || + type === 'toString' || + type === 'valueOf' || + type === '__proto__' || + type === 'toLocaleString' + ); +} + + +module.exports = { + FastEventEmitter, + isEventUnsupportedForFastPath, +}; diff --git a/lib/internal/events/shared_internal_event_emitter.js b/lib/internal/events/shared_internal_event_emitter.js new file mode 100644 index 00000000000000..b22c0157c92b7a --- /dev/null +++ b/lib/internal/events/shared_internal_event_emitter.js @@ -0,0 +1,151 @@ +'use strict'; + +const { + ArrayPrototypeJoin, + ArrayPrototypeSlice, + ArrayPrototypeSplice, + Error, + ErrorCaptureStackTrace, + FunctionPrototypeBind, + ObjectDefineProperty, + StringPrototypeSplit, +} = primordials; + +const { kCapture, kRejection } = require('internal/events/symbols'); +const { inspect, identicalSequenceRange } = require('internal/util/inspect'); + +const { + codes: { + ERR_UNHANDLED_ERROR, + }, + kEnhanceStackBeforeInspector, +} = require('internal/errors'); + +let EventEmitter; + +function arrayClone(arr) { + // At least since V8 8.3, this implementation is faster than the previous + // which always used a simple for-loop + switch (arr.length) { + case 2: return [arr[0], arr[1]]; + case 3: return [arr[0], arr[1], arr[2]]; + case 4: return [arr[0], arr[1], arr[2], arr[3]]; + case 5: return [arr[0], arr[1], arr[2], arr[3], arr[4]]; + case 6: return [arr[0], arr[1], arr[2], arr[3], arr[4], arr[5]]; + } + return ArrayPrototypeSlice(arr); +} + + +function addCatch(that, promise, type, args) { + if (!that[kCapture]) { + return; + } + + // Handle Promises/A+ spec, then could be a getter + // that throws on second use. + try { + const then = promise.then; + + if (typeof then === 'function') { + then.call(promise, undefined, function(err) { + // The callback is called with nextTick to avoid a follow-up + // rejection from this promise. + process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args); + }); + } + } catch (err) { + that.emit('error', err); + } +} + +function emitUnhandledRejectionOrErr(ee, err, type, args) { + if (typeof ee[kRejection] === 'function') { + ee[kRejection](err, type, ...args); + } else { + // We have to disable the capture rejections mechanism, otherwise + // we might end up in an infinite loop. + const prev = ee[kCapture]; + + // If the error handler throws, it is not catchable and it + // will end up in 'uncaughtException'. We restore the previous + // value of kCapture in case the uncaughtException is present + // and the exception is handled. + try { + ee[kCapture] = false; + ee.emit('error', err); + } finally { + ee[kCapture] = prev; + } + } +} + + +function throwErrorOnMissingErrorHandler(...args) { + let er; + if (args.length > 0) + er = args[0]; + + if (er instanceof Error) { + try { + const capture = {}; + EventEmitter ??= require('internal/events/event_emitter').EventEmitter; + ErrorCaptureStackTrace(capture, EventEmitter.prototype.emit); + ObjectDefineProperty(er, kEnhanceStackBeforeInspector, { + __proto__: null, + value: FunctionPrototypeBind(enhanceStackTrace, this, er, capture), + configurable: true, + }); + } catch { + // Continue regardless of error. + } + + // Note: The comments on the `throw` lines are intentional, they show + // up in Node's output if this results in an unhandled exception. + throw er; // Unhandled 'error' event + } + + let stringifiedEr; + try { + stringifiedEr = inspect(er); + } catch { + stringifiedEr = er; + } + + // At least give some kind of context to the user + const err = new ERR_UNHANDLED_ERROR(stringifiedEr); + err.context = er; + throw err; // Unhandled 'error' event +} + + +function enhanceStackTrace(err, own) { + let ctorInfo = ''; + try { + const { name } = this.constructor; + if (name !== 'EventEmitter') + ctorInfo = ` on ${name} instance`; + } catch { + // Continue regardless of error. + } + const sep = `\nEmitted 'error' event${ctorInfo} at:\n`; + + const errStack = ArrayPrototypeSlice( + StringPrototypeSplit(err.stack, '\n'), 1); + const ownStack = ArrayPrototypeSlice( + StringPrototypeSplit(own.stack, '\n'), 1); + + const { len, offset } = identicalSequenceRange(ownStack, errStack); + if (len > 0) { + ArrayPrototypeSplice(ownStack, offset + 1, len - 2, + ' [... lines matching original stack trace ...]'); + } + + return err.stack + sep + ArrayPrototypeJoin(ownStack, '\n'); +} + +module.exports = { + arrayClone, + addCatch, + throwErrorOnMissingErrorHandler, +}; diff --git a/lib/internal/events/slow_event_emitter.js b/lib/internal/events/slow_event_emitter.js new file mode 100644 index 00000000000000..c7a2b9136fd016 --- /dev/null +++ b/lib/internal/events/slow_event_emitter.js @@ -0,0 +1,302 @@ +'use strict'; + +const { + ObjectAssign, + ObjectPrototype, + ObjectPrototypeHasOwnProperty, + ReflectOwnKeys, + String, +} = primordials; + +let spliceOne; + +const { kErrorMonitor, kShapeMode } = require('internal/events/symbols'); +const { + arrayClone, + addCatch, + throwErrorOnMissingErrorHandler, +} = require('internal/events/shared_internal_event_emitter'); +const { genericNodeError } = require('internal/errors'); +const { inspect } = require('internal/util/inspect'); + +let _getMaxListeners; +let protoExists = true; + +try { + // Can't use getOptionValue('--disable-proto') here as it's not available yet + + // Checking whether --disable-proto is equal 'throw' + // eslint-disable-next-line no-proto + protoExists = !!({}).__proto__; + + // Checking whether --disable-proto is equal 'delete' + protoExists = ObjectPrototypeHasOwnProperty(ObjectPrototype, '__proto__'); +} catch { + protoExists = false; +} + +function SlowEventEmitter(eventEmitterTranslationLayer, events = undefined) { + this.eventEmitterTranslationLayer = eventEmitterTranslationLayer; + this._events = events; +} + +SlowEventEmitter.prototype._events = undefined; +SlowEventEmitter.prototype._eventsCount = 0; +SlowEventEmitter.prototype.eventEmitterTranslationLayer = undefined; +SlowEventEmitter.prototype[kShapeMode] = undefined; + +/** + * @param {FastEventEmitter} fastEventEmitter + */ +SlowEventEmitter.fromFastEventEmitter = function fromFastEventEmitter(fastEventEmitter) { + let events = fastEventEmitter._events; + // To support weird cases where the event name is object keys such as __defineGetter__ + if (protoExists) { + // eslint-disable-next-line no-proto + events.__proto__ = null; + } else { + events = ObjectAssign({ __proto__: null }, events); + } + const eventEmitter = new SlowEventEmitter(fastEventEmitter.eventEmitterTranslationLayer, events); + + eventEmitter._eventsCount = fastEventEmitter._eventsCount; + eventEmitter[kShapeMode] = fastEventEmitter[kShapeMode]; + + return eventEmitter; +}; + +/** + * Synchronously calls each of the listeners registered + * for the event. + * @param {string | symbol} type + * @param {...any} [args] + * @returns {boolean} + */ +SlowEventEmitter.prototype.emit = function emit(type, ...args) { + let doError = (type === 'error'); + const eventEmitterTranslationLayer = this.eventEmitterTranslationLayer; + + const events = this._events; + if (events !== undefined) { + if (doError && events[kErrorMonitor] !== undefined) + this.emit(kErrorMonitor, ...args); + doError = (doError && events.error === undefined); + } else if (!doError) + return false; + + // If there is no 'error' event listener then throw. + if (doError) { + throwErrorOnMissingErrorHandler.apply(eventEmitterTranslationLayer, args); + } + + const handler = events[type]; + + if (handler === undefined) + return false; + + if (typeof handler === 'function') { + const result = handler.apply(eventEmitterTranslationLayer, args); + + // We check if result is undefined first because that + // is the most common case so we do not pay any perf + // penalty + if (result !== undefined && result !== null) { + addCatch(eventEmitterTranslationLayer, result, type, args); + } + } else { + const len = handler.length; + const listeners = arrayClone(handler); + for (let i = 0; i < len; ++i) { + const result = listeners[i].apply(eventEmitterTranslationLayer, args); + + // We check if result is undefined first because that + // is the most common case so we do not pay any perf + // penalty. + // This code is duplicated because extracting it away + // would make it non-inlineable. + if (result !== undefined && result !== null) { + addCatch(eventEmitterTranslationLayer, result, type, args); + } + } + } + + return true; +}; + +/** + * Adds a listener to the event emitter. + * @param {string | symbol} type + * @param {Function} listener + * @param {boolean} prepend + */ +SlowEventEmitter.prototype.addListener = function addListener(type, listener, prepend) { + const target = this.eventEmitterTranslationLayer; + let m; + let events; + let existing; + + events = this._events; + if (events === undefined) { + events = this._events = { __proto__: null }; + this._eventsCount = 0; + } else { + // To avoid recursion in the case that type === "newListener"! Before + // adding it to the listeners, first emit "newListener". + if (events.newListener !== undefined) { + this.eventEmitterTranslationLayer.emit('newListener', type, + listener.listener ?? listener); + + // Re-assign `events` because a newListener handler could have caused the + // this._events to be assigned to a new object + events = this._events; + } + existing = events[type]; + } + + if (existing === undefined) { + // Optimize the case of one listener. Don't need the extra array object. + events[type] = listener; + ++this._eventsCount; + } else { + if (typeof existing === 'function') { + // Adding the second element, need to change to array. + existing = events[type] = + prepend ? [listener, existing] : [existing, listener]; + // If we've already got an array, just append. + } else if (prepend) { + existing.unshift(listener); + } else { + existing.push(listener); + } + + // Check for listener leak + _getMaxListeners ??= require('internal/events/event_emitter')._getMaxListeners; + m = _getMaxListeners(target); + if (m > 0 && existing.length > m && !existing.warned) { + existing.warned = true; + // No error code for this since it is a Warning + const w = genericNodeError( + `Possible EventEmitter memory leak detected. ${existing.length} ${String(type)} listeners ` + + `added to ${inspect(target, { depth: -1 })}. MaxListeners is ${m}. Use emitter.setMaxListeners() to increase limit`, + { name: 'MaxListenersExceededWarning', emitter: target, type: type, count: existing.length }); + process.emitWarning(w); + } + } +}; + +/** + * Removes the specified `listener`. + * @param {string | symbol} type + * @param {Function} listener + * @returns {EventEmitter} + */ +SlowEventEmitter.prototype.removeListener = function removeListener(type, listener) { + const events = this._events; + if (events === undefined) + return undefined; + + const list = events[type]; + if (list === undefined) + return undefined; + + if (list === listener || list.listener === listener) { + this._eventsCount -= 1; + + if (this[kShapeMode]) { + events[type] = undefined; + } else if (this._eventsCount === 0) { + this._events = { __proto__: null }; + } else { + delete events[type]; + if (events.removeListener) + this.emit('removeListener', type, list.listener || listener); + } + } else if (typeof list !== 'function') { + let position = -1; + + for (let i = list.length - 1; i >= 0; i--) { + if (list[i] === listener || list[i].listener === listener) { + position = i; + break; + } + } + + if (position < 0) + return undefined; + + if (position === 0) + list.shift(); + else { + if (spliceOne === undefined) + spliceOne = require('internal/util').spliceOne; + spliceOne(list, position); + } + + if (list.length === 1) + events[type] = list[0]; + + if (events.removeListener !== undefined) + this.emit('removeListener', type, listener); + } + + return undefined; +}; + +/** + * Removes all listeners from the event emitter. (Only + * removes listeners for a specific event name if specified + * as `type`). + * @param {string | symbol} [type] + * @returns {EventEmitter} + */ +SlowEventEmitter.prototype.removeAllListeners = function removeAllListeners(type) { + const events = this._events; + if (events === undefined) + return undefined; + + // Not listening for removeListener, no need to emit + if (events.removeListener === undefined) { + if (arguments.length === 0) { + this._events = { __proto__: null }; + this._eventsCount = 0; + } else if (events[type] !== undefined) { + if (--this._eventsCount === 0) + this._events = { __proto__: null }; + else + delete events[type]; + } + this[kShapeMode] = false; + return undefined; + } + + // Emit removeListener for all listeners on all events + if (arguments.length === 0) { + for (const key of ReflectOwnKeys(events)) { + if (key === 'removeListener') continue; + this.eventEmitterTranslationLayer.removeAllListeners(key); + } + this.eventEmitterTranslationLayer.removeAllListeners('removeListener'); + this._events = { __proto__: null }; + this._eventsCount = 0; + this[kShapeMode] = false; + return undefined; + } + + const listeners = events[type]; + + if (typeof listeners === 'function') { + this.eventEmitterTranslationLayer.removeListener(type, listeners); + } else if (listeners !== undefined) { + // LIFO order + for (let i = listeners.length - 1; i >= 0; i--) { + this.eventEmitterTranslationLayer.removeListener(type, listeners[i]); + } + } + + return undefined; +}; + + +module.exports = { + SlowEventEmitter, +}; diff --git a/lib/internal/events/symbols.js b/lib/internal/events/symbols.js index b1b89ddb8f0a4d..9f15f874cb5223 100644 --- a/lib/internal/events/symbols.js +++ b/lib/internal/events/symbols.js @@ -2,10 +2,36 @@ const { Symbol, + SymbolFor, } = primordials; const kFirstEventParam = Symbol('nodejs.kFirstEventParam'); +const kInitialEvents = Symbol('kInitialEvents'); + +const kCapture = Symbol('kCapture'); +const kErrorMonitor = Symbol('events.errorMonitor'); +const kShapeMode = Symbol('shapeMode'); +const kMaxEventTargetListeners = Symbol('events.maxEventTargetListeners'); +const kMaxEventTargetListenersWarned = + Symbol('events.maxEventTargetListenersWarned'); +const kWatermarkData = SymbolFor('nodejs.watermarkData'); + +const kImpl = Symbol('kImpl'); +const kIsFastPath = Symbol('kIsFastPath'); +const kSwitchToSlowPath = Symbol('kSwitchToSlowPath'); +const kRejection = SymbolFor('nodejs.rejection'); module.exports = { kFirstEventParam, + kInitialEvents, + kCapture, + kErrorMonitor, + kShapeMode, + kMaxEventTargetListeners, + kMaxEventTargetListenersWarned, + kWatermarkData, + kImpl, + kIsFastPath, + kSwitchToSlowPath, + kRejection, }; diff --git a/lib/internal/streams/duplex.js b/lib/internal/streams/duplex.js index 6892150c49f659..3331b3f7ec01d5 100644 --- a/lib/internal/streams/duplex.js +++ b/lib/internal/streams/duplex.js @@ -38,6 +38,7 @@ module.exports = Duplex; const Stream = require('internal/streams/legacy').Stream; const Readable = require('internal/streams/readable'); const Writable = require('internal/streams/writable'); +const { kInitialEvents } = require('internal/events/symbols'); const { addAbortSignal, @@ -66,23 +67,26 @@ function Duplex(options) { if (!(this instanceof Duplex)) return new Duplex(options); - this._events ??= { - close: undefined, - error: undefined, - prefinish: undefined, - finish: undefined, - drain: undefined, - data: undefined, - end: undefined, - readable: undefined, - // Skip uncommon events... - // pause: undefined, - // resume: undefined, - // pipe: undefined, - // unpipe: undefined, - // [destroyImpl.kConstruct]: undefined, - // [destroyImpl.kDestroy]: undefined, - }; + const thisEvents = this._events; + if (thisEvents === undefined || thisEvents === null) { + this[kInitialEvents] ??= { + close: undefined, + error: undefined, + prefinish: undefined, + finish: undefined, + drain: undefined, + data: undefined, + end: undefined, + readable: undefined, + // Skip uncommon events... + // pause: undefined, + // resume: undefined, + // pipe: undefined, + // unpipe: undefined, + // [destroyImpl.kConstruct]: undefined, + // [destroyImpl.kDestroy]: undefined, + }; + } this._readableState = new Readable.ReadableState(options, this, true); this._writableState = new Writable.WritableState(options, this, true); diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 1cba326970d4a8..c8f46deb979516 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -43,6 +43,7 @@ Readable.ReadableState = ReadableState; const EE = require('events'); const { Stream, prependListener } = require('internal/streams/legacy'); +const { kInitialEvents, kImpl, kShapeMode, kIsFastPath } = require('internal/events/symbols'); const { Buffer } = require('buffer'); const { @@ -91,6 +92,7 @@ const FastBuffer = Buffer[SymbolSpecies]; const { StringDecoder } = require('string_decoder'); const from = require('internal/streams/from'); +const {FastEventEmitter} = require("internal/events/fast_event_emitter"); ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream); @@ -319,20 +321,24 @@ function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); - this._events ??= { - close: undefined, - error: undefined, - data: undefined, - end: undefined, - readable: undefined, - // Skip uncommon events... - // pause: undefined, - // resume: undefined, - // pipe: undefined, - // unpipe: undefined, - // [destroyImpl.kConstruct]: undefined, - // [destroyImpl.kDestroy]: undefined, - }; + if (this[kImpl] === undefined) { + this[kIsFastPath] = true; + this[kImpl] = new FastEventEmitter(this, this[kInitialEvents] === undefined ? { + close: undefined, + error: undefined, + data: undefined, + end: undefined, + readable: undefined, + // Skip uncommon events... + // pause: undefined, + // resume: undefined, + // pipe: undefined, + // unpipe: undefined, + // [destroyImpl.kConstruct]: undefined, + // [destroyImpl.kDestroy]: undefined, + } : this[kInitialEvents]); + this[kImpl][kShapeMode] = true; + } this._readableState = new ReadableState(options, this, false); diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index a039e60b16c2c2..b7f9a5f90eec6d 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -44,6 +44,7 @@ const EE = require('events'); const Stream = require('internal/streams/legacy').Stream; const { Buffer } = require('buffer'); const destroyImpl = require('internal/streams/destroy'); +const { kInitialEvents } = require('internal/events/symbols'); const { addAbortSignal, @@ -385,16 +386,19 @@ function Writable(options) { if (!(this instanceof Writable)) return new Writable(options); - this._events ??= { - close: undefined, - error: undefined, - prefinish: undefined, - finish: undefined, - drain: undefined, - // Skip uncommon events... - // [destroyImpl.kConstruct]: undefined, - // [destroyImpl.kDestroy]: undefined, - }; + const thisEvents = this._events; + if (thisEvents === undefined || thisEvents === null) { + this[kInitialEvents] ??= { + close: undefined, + error: undefined, + prefinish: undefined, + finish: undefined, + drain: undefined, + // Skip uncommon events... + // [destroyImpl.kConstruct]: undefined, + // [destroyImpl.kDestroy]: undefined, + }; + } this._writableState = new WritableState(options, this, false); diff --git a/test/fixtures/errors/events_unhandled_error_common_trace.snapshot b/test/fixtures/errors/events_unhandled_error_common_trace.snapshot index a482c105b75e48..9e1e47f86d6ec8 100644 --- a/test/fixtures/errors/events_unhandled_error_common_trace.snapshot +++ b/test/fixtures/errors/events_unhandled_error_common_trace.snapshot @@ -1,6 +1,6 @@ -node:events:* - throw er; * Unhandled 'error' event - ^ +node:internal*events*shared_internal_event_emitter:* + throw er; * Unhandled 'error' event + ^ Error: foo:bar at bar (*events_unhandled_error_common_trace.js:*:*) diff --git a/test/fixtures/errors/events_unhandled_error_nexttick.snapshot b/test/fixtures/errors/events_unhandled_error_nexttick.snapshot index 450d4910a385b5..abcdb9da0d7224 100644 --- a/test/fixtures/errors/events_unhandled_error_nexttick.snapshot +++ b/test/fixtures/errors/events_unhandled_error_nexttick.snapshot @@ -1,6 +1,6 @@ -node:events:* - throw er; * Unhandled 'error' event - ^ +node:internal*events*shared_internal_event_emitter:* + throw er; * Unhandled 'error' event + ^ Error at Object. (*events_unhandled_error_nexttick.js:*:*) diff --git a/test/fixtures/errors/events_unhandled_error_sameline.snapshot b/test/fixtures/errors/events_unhandled_error_sameline.snapshot index 520601e617083c..6c7a7d78b8b44c 100644 --- a/test/fixtures/errors/events_unhandled_error_sameline.snapshot +++ b/test/fixtures/errors/events_unhandled_error_sameline.snapshot @@ -1,6 +1,6 @@ -node:events:* - throw er; * Unhandled 'error' event - ^ +node:internal*events*shared_internal_event_emitter:* + throw er; * Unhandled 'error' event + ^ Error at Object. (*events_unhandled_error_sameline.js:*:*) diff --git a/test/fixtures/errors/events_unhandled_error_subclass.snapshot b/test/fixtures/errors/events_unhandled_error_subclass.snapshot index 6a9cfd4a1a0b21..a5c6e9ce829f6a 100644 --- a/test/fixtures/errors/events_unhandled_error_subclass.snapshot +++ b/test/fixtures/errors/events_unhandled_error_subclass.snapshot @@ -1,6 +1,6 @@ -node:events:* - throw er; * Unhandled 'error' event - ^ +node:internal*events*shared_internal_event_emitter:* + throw er; * Unhandled 'error' event + ^ Error at Object. (*events_unhandled_error_subclass.js:*:*) diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index 6327fbeb2e7e1b..e3d67116599141 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -100,6 +100,11 @@ expected.beforePreExec = new Set([ 'NativeModule internal/modules/cjs/loader', 'Internal Binding wasm_web_api', 'NativeModule internal/events/abort_listener', + 'NativeModule internal/events/event_emitter', + 'NativeModule internal/events/fast_event_emitter', + 'NativeModule internal/events/shared_internal_event_emitter', + 'NativeModule internal/events/slow_event_emitter', + 'NativeModule internal/events/symbols', ]); expected.atRunTime = new Set([ diff --git a/test/parallel/test-event-emitter-listeners-side-effects.js b/test/parallel/test-event-emitter-listeners-side-effects.js index 3e427c4c284ea2..c11b48d733f167 100644 --- a/test/parallel/test-event-emitter-listeners-side-effects.js +++ b/test/parallel/test-event-emitter-listeners-side-effects.js @@ -32,7 +32,6 @@ let fl; // foo listeners fl = e.listeners('foo'); assert(Array.isArray(fl)); assert.strictEqual(fl.length, 0); -assert(!(e._events instanceof Object)); assert.deepStrictEqual(Object.keys(e._events), []); e.on('foo', assert.fail); diff --git a/test/parallel/test-event-emitter-set-max-listeners-side-effects.js b/test/parallel/test-event-emitter-set-max-listeners-side-effects.js index 8e66e999a54cab..79d90b322de94d 100644 --- a/test/parallel/test-event-emitter-set-max-listeners-side-effects.js +++ b/test/parallel/test-event-emitter-set-max-listeners-side-effects.js @@ -26,7 +26,6 @@ const events = require('events'); const e = new events.EventEmitter(); -assert(!(e._events instanceof Object)); assert.deepStrictEqual(Object.keys(e._events), []); e.setMaxListeners(5); assert.deepStrictEqual(Object.keys(e._events), []); diff --git a/test/parallel/test-event-emitter-special-event-names.js b/test/parallel/test-event-emitter-special-event-names.js index f34faba9468cc2..d6d0c0c5310c37 100644 --- a/test/parallel/test-event-emitter-special-event-names.js +++ b/test/parallel/test-event-emitter-special-event-names.js @@ -9,9 +9,6 @@ const handler = () => {}; assert.deepStrictEqual(ee.eventNames(), []); -assert.strictEqual(ee._events.hasOwnProperty, undefined); -assert.strictEqual(ee._events.toString, undefined); - ee.on('__proto__', handler); ee.on('__defineGetter__', handler); ee.on('toString', handler); @@ -35,3 +32,11 @@ process.on('__proto__', common.mustCall(function(val) { assert.strictEqual(val, 1); })); process.emit('__proto__', 1); + +const objectPrototypeKeys = Object.getOwnPropertyNames(Object.getPrototypeOf({})); + +assert.notDeepStrictEqual(objectPrototypeKeys, []); +for (const key of objectPrototypeKeys) { + const ee2 = new EventEmitter(); + ee2.emit(key, 1); +} diff --git a/test/parallel/test-event-emitter-subclass.js b/test/parallel/test-event-emitter-subclass.js index a6ef54e5fa7401..396faf84b64ce7 100644 --- a/test/parallel/test-event-emitter-subclass.js +++ b/test/parallel/test-event-emitter-subclass.js @@ -47,7 +47,6 @@ assert.throws(function() { }, /blerg/); process.on('exit', function() { - assert(!(myee._events instanceof Object)); assert.deepStrictEqual(Object.keys(myee._events), []); console.log('ok'); });