Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ describe('Context', () => {
it('reports error if an observer fails', () => {
ctx.bind('bar').to('bar-value');
return expect(ctx.waitUntilObserversNotified()).to.be.rejectedWith(
'something wrong',
'something wrong - async',
);
});

Expand Down Expand Up @@ -239,7 +239,7 @@ describe('Context', () => {
filter: binding => binding.key === 'bar',
observe: async () => {
await setImmediateAsync();
throw new Error('something wrong');
throw new Error('something wrong - async');
},
};
ctx.subscribe(nonMatchingObserver);
Expand Down Expand Up @@ -282,20 +282,21 @@ describe('Context', () => {
});

it('reports error on current context if an observer fails', async () => {
const err = new Error('something wrong');
const err = new Error('something wrong - 1');
server.subscribe((event, binding) => {
if (binding.key === 'bar') {
return Promise.reject(err);
}
});
server.bind('bar').to('bar-value');
// Please note the following code registers an `error` listener on `server`
// so that error events are caught before it is reported as unhandled.
const obj = await pEvent(server, 'error');
expect(obj).to.equal(err);
});

it('reports error on the first context with error listeners on the chain', async () => {
const err = new Error('something wrong');
const err = new Error('something wrong - 2');
server.subscribe((event, binding) => {
if (binding.key === 'bar') {
return Promise.reject(err);
Expand Down
22 changes: 17 additions & 5 deletions packages/context/src/context-subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,11 @@ export class ContextSubscriptionManager extends EventEmitter {
this.setupNotification('bind', 'unbind');

// Create an async iterator for the `notification` event as a queue
this.notificationQueue = iterator(this, 'notification');
this.notificationQueue = iterator(this, 'notification', {
// Do not end the iterator if an error event is emitted on the
// subscription manager
rejectionEvents: [],
});

return this.processNotifications();
}
Expand Down Expand Up @@ -257,11 +261,18 @@ export class ContextSubscriptionManager extends EventEmitter {
);
this.emitEvent('observersNotified', {type, binding, context});
} catch (err) {
this.pendingNotifications--;
// Do not reduce the pending notification count so that errors
// can be captured by waitUntilPendingNotificationsDone
this._debug('Error caught from observers', err);
// Errors caught from observers. Emit it to the current context.
// If no error listeners are registered, crash the process.
this.emitError(err);
// Errors caught from observers.
if (this.listenerCount('error') > 0) {
// waitUntilPendingNotificationsDone may be called
this.emitError(err);
} else {
// Emit it to the current context. If no error listeners are
// registered, crash the process.
this.handleNotificationError(err);
}
}
}
}
Expand Down Expand Up @@ -300,6 +311,7 @@ export class ContextSubscriptionManager extends EventEmitter {
*/
async waitUntilPendingNotificationsDone(timeout?: number) {
const count = this.pendingNotifications;
debug('Number of pending notifications: %d', count);
if (count === 0) return;
await multiple(this, 'observersNotified', {count, timeout});
}
Expand Down