1717
1818import { config } from './config.js' ;
1919
20- import { EventEmitter } from 'node:events' ;
2120import { strict as assert } from 'node:assert' ;
2221import { promisify } from 'node:util' ;
2322
23+ import EventEmitter from 'eventemitter3' ;
2424import type { FastifyPluginAsync } from 'fastify' ;
25- import { JsonPointer } from 'json-ptr' ;
2625import type LightMyRequest from 'light-my-request' ;
2726import type WebSocket from 'ws' ;
2827import fastifyWebsocket from '@fastify/websocket' ;
@@ -54,13 +53,13 @@ const warn = log('websockets:warn');
5453const debug = log ( 'websockets:debug' ) ;
5554const trace = log ( 'websockets:trace' ) ;
5655
57- const emitter = new EventEmitter ( ) ;
56+ const emitter = new EventEmitter < string , { change : Change } > ( ) ;
5857
5958class Watch {
6059 /**
61- * @description Maps requestId to path_leftover
60+ * @description Set of `requestId`s
6261 */
63- readonly requests = new Map < string , string > ( ) ;
62+ readonly requests = new Set < string > ( ) ;
6463 readonly resourceId ;
6564
6665 readonly #send: ( value : unknown ) => Promise < void > ;
@@ -82,9 +81,9 @@ class Watch {
8281 emitter . removeListener ( this . resourceId , this . #cb) ;
8382 }
8483
85- async sendChange ( resp : SocketChange ) {
86- trace ( resp , 'Sending change' ) ;
87- await this . #send( JSON . stringify ( resp ) ) ;
84+ async sendChange ( change : SocketChange ) {
85+ trace ( { change } , 'Sending change' ) ;
86+ await this . #send( JSON . stringify ( change ) ) ;
8887 }
8988
9089 #handler( { change } : { change : Change } ) : void {
@@ -94,30 +93,10 @@ class Watch {
9493 const message = {
9594 resourceId,
9695 change,
97- requestId : [ ] as string [ ] ,
98- path_leftover : [ ] as string [ ] ,
96+ requestId : Array . from ( requests ) as [ string , ...string [ ] ] ,
9997 } ;
100- for ( const [ requestId , pathLeftover ] of requests ) {
101- // Find requests with changes
102- const pathChange : unknown = JsonPointer . get (
103- change ?. [ 0 ] ?. body ?? { } ,
104- pathLeftover
105- ) ;
106- if ( pathChange === undefined ) {
107- // No relevant change
108- continue ;
109- }
110-
111- message . requestId . push ( requestId ) ;
112- message . path_leftover . push ( pathLeftover ) ;
113- }
114-
115- if ( message . requestId . length <= 0 ) {
116- // No-one to notify?
117- return ;
118- }
11998
120- void this . sendChange ( message as SocketChange ) ;
99+ void this . sendChange ( message ) ;
121100 }
122101}
123102
@@ -382,7 +361,7 @@ const plugin: FastifyPluginAsync = async (fastify) => {
382361
383362 const watch =
384363 watches . get ( resourceId ) ?? new Watch ( resourceId , socket ) ;
385- watch . requests . set ( message . requestId , pathLeftover ) ;
364+ watch . requests . add ( message . requestId ) ;
386365 watches . set ( resourceId , watch ) ;
387366
388367 // Emit all new changes from the given rev in the request
@@ -429,7 +408,6 @@ const plugin: FastifyPluginAsync = async (fastify) => {
429408 await watch . sendChange ( {
430409 requestId : [ message . requestId ] ,
431410 resourceId,
432- path_leftover : [ pathLeftover ] ,
433411 change,
434412 } ) ;
435413 }
@@ -517,14 +495,18 @@ const plugin: FastifyPluginAsync = async (fastify) => {
517495 return ;
518496 }
519497
498+ if ( emitter . listeners ( request . resource_id ) . length === 0 ) {
499+ // No WATCHes
500+ return ;
501+ }
502+
520503 try {
521504 const change = await changes . getChangeArray (
522505 request . resource_id ,
523506 request . _rev
524507 ) ;
525508 trace ( { change } , `Emitted change for ${ request . resource_id } ` ) ;
526509 emitter . emit ( request . resource_id , {
527- path_leftover : request . path_leftover ,
528510 change,
529511 } ) ;
530512 if ( change ?. [ 0 ] ?. type === 'delete' ) {
0 commit comments