11'use strict'
22
3- const conf = require ( '../config' )
3+ /**
4+ * Message Broker
5+ */
6+ const config = require ( '../config' )
47const errs = require ( './errors' )
58const Rsmq = require ( 'rsmq-worker' )
69const Throttle = require ( './throttle' )
710
811/**
9- * Message Broker
10- * Polls the queue server for messages, builds requests
12+ * @constructor
13+ * @classdesc Polls the queue server for messages, builds requests
1114 * for the app and handles errors
12- * @param {function } app - the function to call when a message is received
15+ * @param {QueueHandler } queueHandler - the function to call when a message is received
1316 */
14- var Broker = function ( app ) {
15- // get config options
16- var opts = conf . get ( 'broker' )
17-
18- // instantiate message queue
19- this . rsmq = new Rsmq ( opts . queue )
17+ var Broker = function ( queueHandler ) {
18+ var opts = config . get ( 'broker' )
2019
21- // instantiate throttle
22- var throttle = new Throttle ( opts . throttle , ( start , stop ) => {
23- if ( start ) {
24- this . rsmq . start ( )
25- }
26-
27- if ( stop ) {
28- this . rsmq . stop ( )
29- }
30- } )
20+ this . queueHandler = queueHandler
21+ this . rsmq = this . initialiseQueue ( opts )
22+ this . throttle = this . initialiseThrottle ( opts )
3123
3224 // message received
3325 this . rsmq . on ( 'message' , ( msg , next ) => {
34- throttle . more ( )
26+ this . throttle . more ( )
3527 next ( false ) // next message but don't delete
3628 } )
3729
@@ -51,54 +43,67 @@ var Broker = function (app) {
5143 }
5244
5345 // call the app and delete message on success
54- console . log ( 'Broker calling app' )
55- app . handle ( null , req , ( err ) => {
56- console . log ( 'response from app' )
57- if ( processResponse ( req , err ) ) {
46+ this . queueHandler . handle ( null , req , ( err ) => {
47+ if ( this . processResponse ( req , err ) ) {
5848 this . rsmq . del ( msg . id )
5949 }
6050
61- throttle . less ( )
51+ this . throttle . less ( )
6252 } )
53+ } )
6354
64- // done if no error and no retries remaining
65- function processResponse ( req , err ) {
66- var queueError
55+ // queue error
56+ this . rsmq . on ( 'error' , ( err , msg ) => {
57+ this . queueHandler . handle ( new errs . BrokerError ( err , msg . message ) )
58+ } )
6759
68- if ( err ) {
69- queueError = new errs . WorkerError ( err , req . message )
70- }
60+ // start listening
61+ return this . rsmq . start ( )
62+ }
7163
72- if ( err && ! req . retries ) {
73- queueError = new errs . ExceededError ( req . message )
74- }
64+ /**
65+ * done if no error and no retries remaining
66+ */
67+ Broker . prototype . processResponse = function ( req , err ) {
68+ var queueError
7569
76- if ( ! req . address . length ) {
77- queueError = new errs . InvalidError ( req . message )
78- }
70+ if ( err ) {
71+ queueError = new errs . WorkerError ( err , req . message )
72+ }
7973
80- if ( Date . now ( ) / 1000 > req . timeout ) {
81- queueError = new errs . TimeoutError ( req . message )
82- }
74+ if ( err && ! req . retries ) {
75+ queueError = new errs . ExceededError ( req . message )
76+ }
8377
84- if ( queueError ) {
85- app . handle ( queueError )
86- }
78+ if ( ! req . address . length ) {
79+ queueError = new errs . InvalidError ( req . message )
80+ }
8781
88- return ! err || ! req . retries
89- }
90- } )
82+ if ( Date . now ( ) / 1000 > req . timeout ) {
83+ queueError = new errs . TimeoutError ( req . message )
84+ }
9185
92- // queue error
93- this . rsmq . on ( 'error' , ( err , msg ) => {
94- console . log ( err )
95- app ( new errs . BrokerError ( err , msg . message ) )
96- } )
86+ if ( queueError ) {
87+ this . queueHandler . handle ( queueError )
88+ }
9789
98- // start listening
99- return this . rsmq . start ( )
90+ return ! err || ! req . retries
10091}
10192
102- module . exports = function ( app ) {
103- return new Broker ( app )
93+ Broker . prototype . initialiseQueue = function ( options ) {
94+ return new Rsmq ( options . queue )
10495}
96+
97+ Broker . prototype . initialiseThrottle = function ( options ) {
98+ return new Throttle ( options . throttle , ( start , stop ) => {
99+ if ( start ) {
100+ this . rsmq . start ( )
101+ }
102+
103+ if ( stop ) {
104+ this . rsmq . stop ( )
105+ }
106+ } )
107+ }
108+
109+ module . exports = Broker
0 commit comments