diff --git a/README.md b/README.md index b7120ca..0e52d6c 100644 --- a/README.md +++ b/README.md @@ -227,6 +227,37 @@ It also supports an additional `rewriteRequestHeaders(headers, request)` functio opening the WebSocket connection. This function should return an object with the given headers. The default implementation forwards the `cookie` header. +## `wsReconnect` + +**Experimental.** (default: `disabled`) + +Reconnection feature detects and closes broken connections and reconnects automatically, see [how to detect and close broken connections](https://github.com/websockets/ws#how-to-detect-and-close-broken-connections). +The connection is considered broken if the target does not respond to the ping messages or no data is received from the target. + +The `wsReconnect` option contains the configuration for the WebSocket reconnection feature. +To enable the feature, set the `wsReconnect` option to an object with the following properties: + +- `pingInterval`: The interval between ping messages in ms (default: `30_000`). +- `maxReconnectionRetries`: The maximum number of reconnection retries (`1` to `Infinity`, default: `Infinity`). +- `reconnectInterval`: The interval between reconnection attempts in ms (default: `1_000`). +- `reconnectDecay`: The decay factor for the reconnection interval (default: `1.5`). +- `connectionTimeout`: The timeout for establishing the connection in ms (default: `5_000`). +- `reconnectOnClose`: Whether to reconnect on close, as long as the connection from the related client to the proxy is active (default: `false`). +- `logs`: Whether to log the reconnection process (default: `false`). + +See the example in [examples/reconnection](examples/reconnection). + +## wsHooks + +On websocket events, the following hooks are available, note **the hooks are all synchronous**. + +- `onIncomingMessage`: A hook function that is called when the request is received from the client `onIncomingMessage(source, target, { data, binary })` (default: `undefined`). +- `onOutgoingMessage`: A hook function that is called when the response is received from the target `onOutgoingMessage(source, target, { data, binary })` (default: `undefined`). +- `onConnect`: A hook function that is called when the connection is established `onConnect(source, target)` (default: `undefined`). +- `onDisconnect`: A hook function that is called when the connection is closed `onDisconnect(source)` (default: `undefined`). +- `onReconnect`: A hook function that is called when the connection is reconnected `onReconnect(source, target)` (default: `undefined`). The function is called if reconnection feature is enabled. +- `onPong`: A hook function that is called when the target responds to the ping `onPong(source, target)` (default: `undefined`). The function is called if reconnection feature is enabled. + ## Benchmarks The following benchmarks were generated on a dedicated server with an Intel(R) Core(TM) i7-7700 CPU @ 3.60GHz and 64GB of RAM: diff --git a/examples/reconnection/ReconnectionExample.md b/examples/reconnection/ReconnectionExample.md new file mode 100644 index 0000000..675e939 --- /dev/null +++ b/examples/reconnection/ReconnectionExample.md @@ -0,0 +1,60 @@ +# Reconnection Example + +This example demonstrates how to use the reconnection feature of the proxy. + +It simulates an unstable target service: slow to start, unresponsive due to block of the event loop, crash and restart. + +The goal is to ensures a more resilient and customizable integration, minimizing disruptions caused by connection instability. + + +## How to run + +Run the unstable target + +``` +cd examples/reconnection/unstable-target +npm run unstable +``` + +Run the proxy + +``` +cd examples/reconnection/proxy +npm run start +``` + +Then run the client + +``` +cd examples/reconnection/client +npm run start +``` + +--- + +## How it works + +### Proxy Connection Monitoring and Recovery + +The proxy monitors the target connection using a ping/pong mechanism. If a pong response does not arrive on time, the connection is closed, and the proxy attempts to reconnect. + +If the target service crashes, the connection may close either gracefully or abruptly. Regardless of how the disconnection occurs, the proxy detects the connection loss and initiates a reconnection attempt. + +### Connection Stability + +- The connection between the client and the proxy remains unaffected by an unstable target. +- The connection between the proxy and the target may be closed due to: +- The target failing to respond to ping messages, even if the connection is still technically open (e.g., due to a freeze or blockage). +- The target crashing and restarting. + +### Handling Data Loss During Reconnection + +The proxy supports hooks to manage potential data loss during reconnection. These hooks allow for custom logic to ensure message integrity when resending data from the client to the target. + +Examples of how hooks can be used based on the target service type: + +- GraphQL subscriptions: Resend the subscription from the last received message. +- Message brokers: Resend messages starting from the last successfully processed message. + +In this example, the proxy re-sends the messages from the last ping to ensure all the messages are sent to the target, without any additional logic. +Resending messages from the last pong ensures that the target does not miss any messages, but it may send messages more than once. diff --git a/examples/reconnection/client/index.js b/examples/reconnection/client/index.js new file mode 100644 index 0000000..deff07a --- /dev/null +++ b/examples/reconnection/client/index.js @@ -0,0 +1,75 @@ +'use strict' + +const WebSocket = require('ws') + +const port = process.env.PORT || 3001 + +// connect to proxy + +const url = `ws://localhost:${port}/` +const ws = new WebSocket(url) +const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8', objectMode: true }) + +client.setEncoding('utf8') + +let i = 1 +setInterval(() => { + client.write(JSON.stringify({ + message: i + })) + i++ +}, 1000).unref() +const responses = {} + +client.on('data', message => { + const data = JSON.parse(message) + console.log('Received', data) + responses[data.response] = responses[data.response] ? responses[data.response] + 1 : 1 +}) + +client.on('error', error => { + console.log('Error') + console.error(error) +}) + +client.on('close', () => { + console.log('\n\n\nConnection closed') + + console.log('\n\n\nResponses') + for (const key in responses) { + if (!responses[key]) { + console.log('missing', key) + } else if (responses[key] !== 1) { + console.log('extra messages', key, responses[key]) + } + } +}) + +client.on('unexpected-response', (error) => { + console.log('Unexpected response') + console.error(error) +}) + +client.on('redirect', (error) => { + console.log('Redirect') + console.error(error) +}) + +client.on('upgrade', (error) => { + console.log('Upgrade') + console.error(error) +}) + +client.on('ping', (error) => { + console.log('Ping') + console.error(error) +}) + +client.on('pong', (error) => { + console.log('Pong') + console.error(error) +}) + +process.on('SIGINT', () => { + client.end() +}) diff --git a/examples/reconnection/client/package.json b/examples/reconnection/client/package.json new file mode 100644 index 0000000..0a0810a --- /dev/null +++ b/examples/reconnection/client/package.json @@ -0,0 +1,12 @@ +{ + "name": "client", + "version": "1.0.0", + "main": "index.js", + "scripts": { + "start": "node index.js", + "dev": "node --watch index.js" + }, + "dependencies": { + "ws": "^8.18.0" + } +} diff --git a/examples/reconnection/proxy/index.js b/examples/reconnection/proxy/index.js new file mode 100644 index 0000000..770ec67 --- /dev/null +++ b/examples/reconnection/proxy/index.js @@ -0,0 +1,70 @@ +'use strict' + +const { setTimeout: wait } = require('node:timers/promises') +const fastify = require('fastify') +const fastifyHttpProxy = require('../../../') + +async function main () { + const port = process.env.PORT || 3001 + + const wsReconnect = { + logs: true, + pingInterval: 3_000, + reconnectOnClose: true, + } + + let backup = [] + let lastPong = Date.now() + + // resend messages from last ping + // it may send messages more than once + // in case the target already received messages between last ping and the reconnection + async function resendMessages (target) { + const now = Date.now() + + for (const m of backup) { + if (m.timestamp < lastPong || m.timestamp > now) { + continue + } + console.log(' >>> resending message #', m) + target.send(m.message) + // introduce a small delay to avoid to flood the target + await wait(250) + } + }; + + const wsHooks = { + onPong: () => { + console.log('onPong') + lastPong = Date.now() + // clean backup from the last ping + backup = backup.filter(message => message.timestamp > lastPong) + }, + onIncomingMessage: (source, target, message) => { + const m = message.data.toString() + console.log('onIncomingMessage backup', m) + backup.push({ message: m, timestamp: Date.now() }) + }, + onDisconnect: () => { + console.log('onDisconnect') + backup.length = 0 + }, + onReconnect: (source, target) => { + console.log('onReconnect') + resendMessages(target) + }, + } + + const proxy = fastify({ logger: true }) + proxy.register(fastifyHttpProxy, { + upstream: 'http://localhost:3000/', + websocket: true, + wsUpstream: 'ws://localhost:3000/', + wsReconnect, + wsHooks, + }) + + await proxy.listen({ port }) +} + +main() diff --git a/examples/reconnection/proxy/package.json b/examples/reconnection/proxy/package.json new file mode 100644 index 0000000..450fb2b --- /dev/null +++ b/examples/reconnection/proxy/package.json @@ -0,0 +1,12 @@ +{ + "name": "proxy", + "version": "1.0.0", + "main": "index.js", + "scripts": { + "start": "node index.js", + "dev": "node --watch index.js" + }, + "dependencies": { + "fastify": "^5.2.1" + } +} diff --git a/examples/reconnection/unstable-target/index.js b/examples/reconnection/unstable-target/index.js new file mode 100644 index 0000000..3000049 --- /dev/null +++ b/examples/reconnection/unstable-target/index.js @@ -0,0 +1,75 @@ +'use strict' + +const { setTimeout: wait } = require('node:timers/promises') +const fastify = require('fastify') + +// unstable service + +async function main () { + const SLOW_START = process.env.SLOW_START || 2_000 + const UNSTABLE_MIN = process.env.UNSTABLE_MIN || 1_000 + const UNSTABLE_MAX = process.env.UNSTABLE_MAX || 10_000 + const BLOCK_TIME = process.env.BLOCK_TIME || 5_000 + + const app = fastify({ logger: true }) + + // slow start + + await wait(SLOW_START) + + app.register(require('@fastify/websocket')) + app.register(async function (app) { + app.get('/', { websocket: true }, (socket) => { + socket.on('message', message => { + let m = message.toString() + console.log('incoming message', m) + m = JSON.parse(m) + + socket.send(JSON.stringify({ + response: m.message + })) + }) + }) + }) + + try { + const port = process.env.PORT || 3000 + await app.listen({ port }) + } catch (err) { + app.log.error(err) + process.exit(1) + } + + if (process.env.STABLE) { + return + } + + function runProblem () { + const problem = process.env.PROBLEM || (Math.random() < 0.5 ? 'crash' : 'block') + const unstabilityTimeout = process.env.UNSTABLE_TIMEOUT || Math.round(UNSTABLE_MIN + Math.random() * (UNSTABLE_MAX - UNSTABLE_MIN)) + + if (problem === 'crash') { + console.log(`Restarting (crash and restart) in ${unstabilityTimeout}ms`) + setTimeout(() => { + console.log('UNHANDLED EXCEPTION') + throw new Error('UNHANDLED EXCEPTION') + }, unstabilityTimeout).unref() + } else { + console.log(`Blocking EL in ${unstabilityTimeout}ms for ${BLOCK_TIME}ms`) + + setTimeout(() => { + console.log('Block EL ...') + const start = performance.now() + while (performance.now() - start < BLOCK_TIME) { + // just block + } + console.log('Block ends') + runProblem() + }, unstabilityTimeout).unref() + } + } + + runProblem() +} + +main() diff --git a/examples/reconnection/unstable-target/package.json b/examples/reconnection/unstable-target/package.json new file mode 100644 index 0000000..4a33bd8 --- /dev/null +++ b/examples/reconnection/unstable-target/package.json @@ -0,0 +1,14 @@ +{ + "name": "unstable-target", + "version": "1.0.0", + "main": "index.js", + "scripts": { + "stable": "STABLE=1 node index.js", + "unstable": "forever index.js", + "dev": "node --watch index.js" + }, + "dependencies": { + "fastify": "^5.2.1", + "forever": "^4.0.3" + } +} diff --git a/index.js b/index.js index d355387..4f0e5af 100644 --- a/index.js +++ b/index.js @@ -1,10 +1,12 @@ 'use strict' +const { setTimeout: wait } = require('node:timers/promises') const From = require('@fastify/reply-from') const { ServerResponse } = require('node:http') const WebSocket = require('ws') const { convertUrlToWebSocket } = require('./utils') const fp = require('fastify-plugin') const qs = require('fast-querystring') +const { validateOptions } = require('./src/options') const httpMethods = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT', 'OPTIONS'] const urlPattern = /^https?:\/\// @@ -27,6 +29,7 @@ function liftErrorCode (code) { } function closeWebSocket (socket, code, reason) { + socket.isAlive = false if (socket.readyState === WebSocket.OPEN) { socket.close(liftErrorCode(code), reason) } @@ -40,19 +43,70 @@ function waitConnection (socket, write) { } } +function waitForConnection (target, timeout) { + return new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + /* c8 ignore start */ + reject(new Error('WebSocket connection timeout')) + /* c8 ignore stop */ + }, timeout) + + /* c8 ignore start */ + if (target.readyState === WebSocket.OPEN) { + clearTimeout(timeoutId) + return resolve() + } + /* c8 ignore stop */ + + if (target.readyState === WebSocket.CONNECTING) { + target.once('open', () => { + clearTimeout(timeoutId) + resolve() + }) + target.once('error', (err) => { + clearTimeout(timeoutId) + reject(err) + }) + /* c8 ignore start */ + } else { + clearTimeout(timeoutId) + reject(new Error('WebSocket is closed')) + } + /* c8 ignore stop */ + }) +} + function isExternalUrl (url) { return urlPattern.test(url) } -function noop () {} +function noop () { } -function proxyWebSockets (source, target) { +function proxyWebSockets (logger, source, target, hooks) { function close (code, reason) { + if (hooks.onDisconnect) { + waitConnection(target, () => { + try { + hooks.onDisconnect(source) + } catch (err) { + logger.error({ err }, 'proxy ws error from onDisconnect hook') + } + }) + } closeWebSocket(source, code, reason) closeWebSocket(target, code, reason) } - source.on('message', (data, binary) => waitConnection(target, () => target.send(data, { binary }))) + source.on('message', (data, binary) => { + if (hooks.onIncomingMessage) { + try { + hooks.onIncomingMessage(source, target, { data, binary }) + } catch (err) { + logger.error({ err }, 'proxy ws error from onIncomingMessage hook') + } + } + waitConnection(target, () => target.send(data, { binary })) + }) /* c8 ignore start */ source.on('ping', data => waitConnection(target, () => target.ping(data))) source.on('pong', data => waitConnection(target, () => target.pong(data))) @@ -64,7 +118,16 @@ function proxyWebSockets (source, target) { /* c8 ignore stop */ // source WebSocket is already connected because it is created by ws server - target.on('message', (data, binary) => source.send(data, { binary })) + target.on('message', (data, binary) => { + if (hooks.onOutgoingMessage) { + try { + hooks.onOutgoingMessage(source, target, { data, binary }) + } catch (err) { + logger.error({ err }, 'proxy ws error from onOutgoingMessage hook') + } + } + source.send(data, { binary }) + }) /* c8 ignore start */ target.on('ping', data => source.ping(data)) /* c8 ignore stop */ @@ -74,6 +137,217 @@ function proxyWebSockets (source, target) { target.on('error', error => close(1011, error.message)) target.on('unexpected-response', () => close(1011, 'unexpected response')) /* c8 ignore stop */ + + if (hooks.onConnect) { + waitConnection(target, () => { + try { + hooks.onConnect(source, target) + } catch (err) { + logger.error({ err }, 'proxy ws error from onConnect hook') + } + }) + } +} + +async function reconnect (logger, source, reconnectOptions, hooks, targetParams) { + const { url, subprotocols, optionsWs } = targetParams + + let attempts = 0 + let target + do { + const reconnectWait = reconnectOptions.reconnectInterval * (reconnectOptions.reconnectDecay * attempts || 1) + reconnectOptions.logs && logger.warn({ target: targetParams.url }, `proxy ws reconnect in ${reconnectWait} ms`) + await wait(reconnectWait) + + try { + target = new WebSocket(url, subprotocols, optionsWs) + await waitForConnection(target, reconnectOptions.connectionTimeout) + } catch (err) { + reconnectOptions.logs && logger.error({ target: targetParams.url, err, attempts }, 'proxy ws reconnect error') + attempts++ + target = undefined + } + // stop if the source connection is closed during the reconnection + } while (source.isAlive && !target && attempts < reconnectOptions.maxReconnectionRetries) + + /* c8 ignore start */ + if (!source.isAlive) { + reconnectOptions.logs && logger.info({ target: targetParams.url, attempts }, 'proxy ws abort reconnect due to source is closed') + source.close() + return + } + /* c8 ignore stop */ + + if (!target) { + logger.error({ target: targetParams.url, attempts }, 'proxy ws failed to reconnect! No more retries') + source.close() + return + } + + reconnectOptions.logs && logger.info({ target: targetParams.url, attempts }, 'proxy ws reconnected') + proxyWebSocketsWithReconnection(logger, source, target, reconnectOptions, hooks, targetParams, true) +} + +function proxyWebSocketsWithReconnection (logger, source, target, options, hooks, targetParams, isReconnecting = false) { + function close (code, reason) { + target.pingTimer && clearInterval(target.pingTimer) + target.pingTimer = undefined + closeWebSocket(target, code, reason) + + // reconnect target as long as the source connection is active + if (source.isAlive && (target.broken || options.reconnectOnClose)) { + // clean up the target and related source listeners + target.isAlive = false + target.removeAllListeners() + + reconnect(logger, source, options, hooks, targetParams) + return + } + + if (hooks.onDisconnect) { + try { + hooks.onDisconnect(source) + } catch (err) { + options.logs && logger.error({ target: targetParams.url, err }, 'proxy ws error from onDisconnect hook') + } + } + + options.logs && logger.info({ msg: 'proxy ws close link' }) + closeWebSocket(source, code, reason) + closeWebSocket(target, code, reason) + } + + function removeSourceListeners (source) { + source.off('message', sourceOnMessage) + source.off('ping', sourceOnPing) + source.off('pong', sourceOnPong) + source.off('close', sourceOnClose) + source.off('error', sourceOnError) + source.off('unexpected-response', sourceOnUnexpectedResponse) + } + + /* c8 ignore start */ + function sourceOnMessage (data, binary) { + source.isAlive = true + if (hooks.onIncomingMessage) { + try { + hooks.onIncomingMessage(source, target, { data, binary }) + } catch (err) { + logger.error({ target: targetParams.url, err }, 'proxy ws error from onIncomingMessage hook') + } + } + waitConnection(target, () => target.send(data, { binary })) + } + function sourceOnPing (data) { + source.isAlive = true + waitConnection(target, () => target.ping(data)) + } + function sourceOnPong (data) { + source.isAlive = true + waitConnection(target, () => target.pong(data)) + } + function sourceOnClose (code, reason) { + source.isAlive = false + options.logs && logger.warn({ target: targetParams.url, code, reason }, 'proxy ws source close event') + close(code, reason) + } + function sourceOnError (error) { + source.isAlive = false + options.logs && logger.warn({ target: targetParams.url, error: error.message }, 'proxy ws source error event') + close(1011, error.message) + } + function sourceOnUnexpectedResponse () { + source.isAlive = false + options.logs && logger.warn({ target: targetParams.url }, 'proxy ws source unexpected-response event') + close(1011, 'unexpected response') + } + /* c8 ignore stop */ + + // need to specify the listeners to remove + removeSourceListeners(source) + // source is alive since it is created by the proxy service + // the pinger is not set since we can't reconnect from here + source.isAlive = true + source.on('message', sourceOnMessage) + source.on('ping', sourceOnPing) + source.on('pong', sourceOnPong) + source.on('close', sourceOnClose) + source.on('error', sourceOnError) + source.on('unexpected-response', sourceOnUnexpectedResponse) + + // source WebSocket is already connected because it is created by ws server + /* c8 ignore start */ + target.on('message', (data, binary) => { + target.isAlive = true + if (hooks.onOutgoingMessage) { + try { + hooks.onOutgoingMessage(source, target, { data, binary }) + } catch (err) { + logger.error({ target: targetParams.url, err }, 'proxy ws error from onOutgoingMessage hook') + } + } + source.send(data, { binary }) + }) + target.on('ping', data => { + target.isAlive = true + source.ping(data) + }) + target.on('pong', data => { + target.isAlive = true + if (hooks.onPong) { + try { + hooks.onPong(source, target) + } catch (err) { + logger.error({ target: targetParams.url, err }, 'proxy ws error from onPong hook') + } + } + source.pong(data) + }) + /* c8 ignore stop */ + target.on('close', (code, reason) => { + options.logs && logger.warn({ target: targetParams.url, code, reason }, 'proxy ws target close event') + close(code, reason) + }) + /* c8 ignore start */ + target.on('error', error => { + options.logs && logger.warn({ target: targetParams.url, error: error.message }, 'proxy ws target error event') + close(1011, error.message) + }) + target.on('unexpected-response', () => { + options.logs && logger.warn({ target: targetParams.url }, 'proxy ws target unexpected-response event') + close(1011, 'unexpected response') + }) + /* c8 ignore stop */ + + waitConnection(target, () => { + target.isAlive = true + target.pingTimer = setInterval(() => { + if (target.isAlive === false) { + target.broken = true + options.logs && logger.warn({ target: targetParams.url }, 'proxy ws connection is broken') + target.pingTimer && clearInterval(target.pingTimer) + target.pingTimer = undefined + return target.terminate() + } + target.isAlive = false + target.ping() + }, options.pingInterval).unref() + + // call onConnect and onReconnect callbacks after the events are bound + if (isReconnecting && hooks.onReconnect) { + try { + hooks.onReconnect(source, target) + } catch (err) { + options.logs && logger.error({ target: targetParams.url, err }, 'proxy ws error from onReconnect hook') + } + } else if (hooks.onConnect) { + try { + hooks.onConnect(source, target) + } catch (err) { + options.logs && logger.error({ target: targetParams.url, err }, 'proxy ws error from onConnect hook') + } + } + }) } function handleUpgrade (fastify, rawRequest, socket, head) { @@ -91,7 +365,7 @@ function handleUpgrade (fastify, rawRequest, socket, head) { } class WebSocketProxy { - constructor (fastify, { wsServerOptions, wsClientOptions, upstream, wsUpstream, replyOptions: { getUpstream } = {} }) { + constructor (fastify, { wsReconnect, wsHooks, wsServerOptions, wsClientOptions, upstream, wsUpstream, replyOptions: { getUpstream } = {} }) { this.logger = fastify.log this.wsClientOptions = { rewriteRequestHeaders: defaultWsHeadersRewrite, @@ -101,7 +375,8 @@ class WebSocketProxy { this.upstream = upstream ? convertUrlToWebSocket(upstream) : '' this.wsUpstream = wsUpstream ? convertUrlToWebSocket(wsUpstream) : '' this.getUpstream = getUpstream - + this.wsReconnect = wsReconnect + this.wsHooks = wsHooks const wss = new WebSocket.Server({ noServer: true, ...wsServerOptions @@ -190,7 +465,13 @@ class WebSocketProxy { const target = new WebSocket(url, subprotocols, optionsWs) this.logger.debug({ url: url.href }, 'proxy websocket') - proxyWebSockets(source, target) + + if (this.wsReconnect) { + const targetParams = { url, subprotocols, optionsWs } + proxyWebSocketsWithReconnection(this.logger, source, target, this.wsReconnect, this.wsHooks, targetParams) + } else { + proxyWebSockets(this.logger, source, target, this.wsHooks) + } } } @@ -228,9 +509,7 @@ function generateRewritePrefix (prefix, opts) { } async function fastifyHttpProxy (fastify, opts) { - if (!opts.upstream && !opts.websocket && !((opts.upstream === '' || opts.wsUpstream === '') && opts.replyOptions && typeof opts.replyOptions.getUpstream === 'function')) { - throw new Error('upstream must be specified') - } + opts = validateOptions(opts) const preHandler = opts.preHandler || opts.beforeHandler const rewritePrefix = generateRewritePrefix(fastify.prefix, opts) diff --git a/package.json b/package.json index 7ce674e..1e9fdcc 100644 --- a/package.json +++ b/package.json @@ -72,6 +72,8 @@ "http-errors": "^2.0.0", "http-proxy": "^1.18.1", "neostandard": "^0.12.0", + "pino": "^9.6.0", + "pino-test": "^1.1.0", "simple-get": "^4.0.1", "socket.io": "^4.7.5", "socket.io-client": "^4.7.5", diff --git a/src/options.js b/src/options.js new file mode 100644 index 0000000..658bdc5 --- /dev/null +++ b/src/options.js @@ -0,0 +1,94 @@ +'use strict' + +const DEFAULT_PING_INTERVAL = 30_000 +const DEFAULT_MAX_RECONNECTION_RETRIES = Infinity +const DEFAULT_RECONNECT_INTERVAL = 1_000 +const DEFAULT_RECONNECT_DECAY = 1.5 +const DEFAULT_CONNECTION_TIMEOUT = 5_000 +const DEFAULT_RECONNECT_ON_CLOSE = false +const DEFAULT_LOGS = false + +function validateOptions (options) { + if (!options.upstream && !options.websocket && !((options.upstream === '' || options.wsUpstream === '') && options.replyOptions && typeof options.replyOptions.getUpstream === 'function')) { + throw new Error('upstream must be specified') + } + + if (options.wsReconnect) { + const wsReconnect = options.wsReconnect + + if (wsReconnect.pingInterval !== undefined && (typeof wsReconnect.pingInterval !== 'number' || wsReconnect.pingInterval < 0)) { + throw new Error('wsReconnect.pingInterval must be a non-negative number') + } + wsReconnect.pingInterval = wsReconnect.pingInterval ?? DEFAULT_PING_INTERVAL + + if (wsReconnect.maxReconnectionRetries !== undefined && (typeof wsReconnect.maxReconnectionRetries !== 'number' || wsReconnect.maxReconnectionRetries < 1)) { + throw new Error('wsReconnect.maxReconnectionRetries must be a number greater than or equal to 1') + } + wsReconnect.maxReconnectionRetries = wsReconnect.maxReconnectionRetries ?? DEFAULT_MAX_RECONNECTION_RETRIES + + if (wsReconnect.reconnectInterval !== undefined && (typeof wsReconnect.reconnectInterval !== 'number' || wsReconnect.reconnectInterval < 100)) { + throw new Error('wsReconnect.reconnectInterval (ms) must be a number greater than or equal to 100') + } + wsReconnect.reconnectInterval = wsReconnect.reconnectInterval ?? DEFAULT_RECONNECT_INTERVAL + + if (wsReconnect.reconnectDecay !== undefined && (typeof wsReconnect.reconnectDecay !== 'number' || wsReconnect.reconnectDecay < 1)) { + throw new Error('wsReconnect.reconnectDecay must be a number greater than or equal to 1') + } + wsReconnect.reconnectDecay = wsReconnect.reconnectDecay ?? DEFAULT_RECONNECT_DECAY + + if (wsReconnect.connectionTimeout !== undefined && (typeof wsReconnect.connectionTimeout !== 'number' || wsReconnect.connectionTimeout < 0)) { + throw new Error('wsReconnect.connectionTimeout must be a non-negative number') + } + wsReconnect.connectionTimeout = wsReconnect.connectionTimeout ?? DEFAULT_CONNECTION_TIMEOUT + + if (wsReconnect.reconnectOnClose !== undefined && typeof wsReconnect.reconnectOnClose !== 'boolean') { + throw new Error('wsReconnect.reconnectOnClose must be a boolean') + } + wsReconnect.reconnectOnClose = wsReconnect.reconnectOnClose ?? DEFAULT_RECONNECT_ON_CLOSE + + if (wsReconnect.logs !== undefined && typeof wsReconnect.logs !== 'boolean') { + throw new Error('wsReconnect.logs must be a boolean') + } + wsReconnect.logs = wsReconnect.logs ?? DEFAULT_LOGS + } + + if (options.wsHooks) { + const wsHooks = options.wsHooks + + if (wsHooks.onReconnect !== undefined && typeof wsHooks.onReconnect !== 'function') { + throw new Error('wsHooks.onReconnect must be a function') + } + + if (wsHooks.onIncomingMessage !== undefined && typeof wsHooks.onIncomingMessage !== 'function') { + throw new Error('wsHooks.onIncomingMessage must be a function') + } + + if (wsHooks.onOutgoingMessage !== undefined && typeof wsHooks.onOutgoingMessage !== 'function') { + throw new Error('wsHooks.onOutgoingMessage must be a function') + } + + if (wsHooks.onPong !== undefined && typeof wsHooks.onPong !== 'function') { + throw new Error('wsHooks.onPong must be a function') + } + } else { + options.wsHooks = { + onReconnect: undefined, + onIncomingMessage: undefined, + onOutgoingMessage: undefined, + onPong: undefined, + } + } + + return options +} + +module.exports = { + validateOptions, + DEFAULT_PING_INTERVAL, + DEFAULT_MAX_RECONNECTION_RETRIES, + DEFAULT_RECONNECT_INTERVAL, + DEFAULT_RECONNECT_DECAY, + DEFAULT_CONNECTION_TIMEOUT, + DEFAULT_RECONNECT_ON_CLOSE, + DEFAULT_LOGS, +} diff --git a/test/helper/helper.js b/test/helper/helper.js new file mode 100644 index 0000000..c216fb1 --- /dev/null +++ b/test/helper/helper.js @@ -0,0 +1,83 @@ +'use strict' + +const { createServer } = require('node:http') +const { promisify } = require('node:util') +const { once } = require('node:events') +const Fastify = require('fastify') +const WebSocket = require('ws') +const pinoTest = require('pino-test') +const pino = require('pino') +const proxyPlugin = require('../../') + +function waitForLogMessage (loggerSpy, message, max = 100) { + return new Promise((resolve, reject) => { + let count = 0 + const fn = (received) => { + if (received.msg === message) { + loggerSpy.off('data', fn) + resolve() + } + count++ + if (count > max) { + loggerSpy.off('data', fn) + reject(new Error(`Max message count reached on waitForLogMessage: ${message}`)) + } + } + loggerSpy.on('data', fn) + }) +} + +async function createTargetServer (t, wsTargetOptions, port = 0) { + const targetServer = createServer() + const targetWs = new WebSocket.Server({ server: targetServer, ...wsTargetOptions }) + await promisify(targetServer.listen.bind(targetServer))({ port, host: '127.0.0.1' }) + + t.after(() => { + targetWs.close() + targetServer.close() + }) + + return { targetServer, targetWs } +} + +async function createServices ({ t, wsReconnectOptions, wsTargetOptions, wsServerOptions, wsHooks, targetPort = 0 }) { + const { targetServer, targetWs } = await createTargetServer(t, wsTargetOptions, targetPort) + + const loggerSpy = pinoTest.sink() + const logger = pino(loggerSpy) + const proxy = Fastify({ loggerInstance: logger }) + proxy.register(proxyPlugin, { + upstream: `ws://127.0.0.1:${targetServer.address().port}`, + websocket: true, + wsReconnect: wsReconnectOptions, + wsServerOptions, + wsHooks + }) + + await proxy.listen({ port: 0, host: '127.0.0.1' }) + + const client = new WebSocket(`ws://127.0.0.1:${proxy.server.address().port}`) + await once(client, 'open') + + t.after(async () => { + client.close() + await proxy.close() + }) + + return { + target: { + ws: targetWs, + server: targetServer + }, + proxy, + client, + loggerSpy, + logger + } +} + +module.exports = { + waitForLogMessage, + createTargetServer, + createServices +} diff --git a/test/options.js b/test/options.js new file mode 100644 index 0000000..fea5bf5 --- /dev/null +++ b/test/options.js @@ -0,0 +1,97 @@ +'use strict' + +const { test } = require('node:test') +const assert = require('node:assert') +const { validateOptions } = require('../src/options') +const { + DEFAULT_PING_INTERVAL, DEFAULT_MAX_RECONNECTION_RETRIES, DEFAULT_RECONNECT_INTERVAL, DEFAULT_RECONNECT_DECAY, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_RECONNECT_ON_CLOSE, DEFAULT_LOGS +} = require('../src/options') + +test('validateOptions', (t) => { + const requiredOptions = { + upstream: 'someUpstream' + } + + assert.throws(() => validateOptions({}), /upstream must be specified/) + + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { pingInterval: -1 } }), /wsReconnect.pingInterval must be a non-negative number/) + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { pingInterval: '1' } }), /wsReconnect.pingInterval must be a non-negative number/) + assert.doesNotThrow(() => validateOptions({ ...requiredOptions, wsReconnect: { pingInterval: 1 } })) + + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { maxReconnectionRetries: 0 } }), /wsReconnect.maxReconnectionRetries must be a number greater than or equal to 1/) + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { maxReconnectionRetries: -1 } }), /wsReconnect.maxReconnectionRetries must be a number greater than or equal to 1/) + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { maxReconnectionRetries: '1' } }), /wsReconnect.maxReconnectionRetries must be a number greater than or equal to 1/) + assert.doesNotThrow(() => validateOptions({ ...requiredOptions, wsReconnect: { maxReconnectionRetries: 1 } })) + + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { reconnectInterval: 0 } }), /wsReconnect.reconnectInterval \(ms\) must be a number greater than or equal to 100/) + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { reconnectInterval: -1 } }), /wsReconnect.reconnectInterval \(ms\) must be a number greater than or equal to 100/) + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { reconnectInterval: '1' } }), /wsReconnect.reconnectInterval \(ms\) must be a number greater than or equal to 100/) + assert.doesNotThrow(() => validateOptions({ ...requiredOptions, wsReconnect: { reconnectInterval: 100 } })) + + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { reconnectDecay: 0 } }), /wsReconnect.reconnectDecay must be a number greater than or equal to 1/) + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { reconnectDecay: -1 } }), /wsReconnect.reconnectDecay must be a number greater than or equal to 1/) + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { reconnectDecay: '1' } }), /wsReconnect.reconnectDecay must be a number greater than or equal to 1/) + assert.doesNotThrow(() => validateOptions({ ...requiredOptions, wsReconnect: { reconnectDecay: 1 } })) + + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { connectionTimeout: -1 } }), /wsReconnect.connectionTimeout must be a non-negative number/) + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { connectionTimeout: '1' } }), /wsReconnect.connectionTimeout must be a non-negative number/) + assert.doesNotThrow(() => validateOptions({ ...requiredOptions, wsReconnect: { connectionTimeout: 1 } })) + + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { reconnectOnClose: '1' } }), /wsReconnect.reconnectOnClose must be a boolean/) + assert.doesNotThrow(() => validateOptions({ ...requiredOptions, wsReconnect: { reconnectOnClose: true } })) + + assert.throws(() => validateOptions({ ...requiredOptions, wsReconnect: { logs: '1' } }), /wsReconnect.logs must be a boolean/) + assert.doesNotThrow(() => validateOptions({ ...requiredOptions, wsReconnect: { logs: true } })) + + assert.throws(() => validateOptions({ ...requiredOptions, wsHooks: { onReconnect: '1' } }), /wsHooks.onReconnect must be a function/) + assert.doesNotThrow(() => validateOptions({ ...requiredOptions, wsHooks: { onReconnect: () => { } } })) + + assert.throws(() => validateOptions({ ...requiredOptions, wsHooks: { onIncomingMessage: '1' } }), /wsHooks.onIncomingMessage must be a function/) + assert.doesNotThrow(() => validateOptions({ ...requiredOptions, wsHooks: { onIncomingMessage: () => { } } })) + + assert.throws(() => validateOptions({ ...requiredOptions, wsHooks: { onOutgoingMessage: '1' } }), /wsHooks.onOutgoingMessage must be a function/) + assert.doesNotThrow(() => validateOptions({ ...requiredOptions, wsHooks: { onOutgoingMessage: () => { } } })) + + assert.throws(() => validateOptions({ ...requiredOptions, wsHooks: { onPong: '1' } }), /wsHooks.onPong must be a function/) + assert.doesNotThrow(() => validateOptions({ ...requiredOptions, wsHooks: { onPong: () => { } } })) + + // set all values + assert.doesNotThrow(() => validateOptions({ + ...requiredOptions, + wsReconnect: { + pingInterval: 1, + maxReconnectionRetries: 1, + reconnectInterval: 100, + reconnectDecay: 1, + connectionTimeout: 1, + reconnectOnClose: true, + logs: true, + }, + wsHooks: { + onReconnect: () => { }, + onIncomingMessage: () => { }, + onOutgoingMessage: () => { }, + onPong: () => { } + } + })) + + // get default values + assert.deepEqual(validateOptions({ ...requiredOptions, wsReconnect: {} }), { + ...requiredOptions, + wsReconnect: { + pingInterval: DEFAULT_PING_INTERVAL, + maxReconnectionRetries: DEFAULT_MAX_RECONNECTION_RETRIES, + reconnectInterval: DEFAULT_RECONNECT_INTERVAL, + reconnectDecay: DEFAULT_RECONNECT_DECAY, + connectionTimeout: DEFAULT_CONNECTION_TIMEOUT, + reconnectOnClose: DEFAULT_RECONNECT_ON_CLOSE, + logs: DEFAULT_LOGS, + }, + wsHooks: { + onReconnect: undefined, + onIncomingMessage: undefined, + onOutgoingMessage: undefined, + onPong: undefined, + } + }) +}) diff --git a/test/websocket.js b/test/websocket.js index ba7e125..7c10e97 100644 --- a/test/websocket.js +++ b/test/websocket.js @@ -1,12 +1,14 @@ 'use strict' const { test } = require('node:test') +const assert = require('node:assert') const Fastify = require('fastify') const proxy = require('../') const WebSocket = require('ws') const { createServer } = require('node:http') const { promisify } = require('node:util') const { once } = require('node:events') +const { waitForLogMessage, createServices } = require('./helper/helper') const cookieValue = 'foo=bar' const subprotocolValue = 'foo-subprotocol' @@ -710,3 +712,101 @@ test('multiple websocket upstreams with distinct server options', async (t) => { server.close() ]) }) + +test('should call onIncomingMessage and onOutgoingMessage hooks', async (t) => { + const request = 'query () { ... }' + const response = 'data ...' + const onIncomingMessage = (source, target, { data, binary }) => { + assert.strictEqual(data.toString(), request) + assert.strictEqual(binary, false) + logger.info('onIncomingMessage called') + } + const onOutgoingMessage = (source, target, { data, binary }) => { + assert.strictEqual(data.toString(), response) + assert.strictEqual(binary, false) + logger.info('onOutgoingMessage called') + } + + const { target, loggerSpy, logger, client } = await createServices({ t, wsHooks: { onIncomingMessage, onOutgoingMessage } }) + + target.ws.on('connection', async (socket) => { + socket.on('message', async (data, binary) => { + socket.send(response, { binary }) + }) + }) + + client.send(request) + + await waitForLogMessage(loggerSpy, 'onIncomingMessage called') + await waitForLogMessage(loggerSpy, 'onOutgoingMessage called') +}) + +test('should handle throwing an error in onIncomingMessage and onOutgoingMessage hooks', async (t) => { + const request = 'query () { ... }' + const response = 'data ...' + const onIncomingMessage = (source, target, { data, binary }) => { + assert.strictEqual(data.toString(), request) + assert.strictEqual(binary, false) + throw new Error('onIncomingMessage error') + } + const onOutgoingMessage = (source, target, { data, binary }) => { + assert.strictEqual(data.toString(), response) + assert.strictEqual(binary, false) + throw new Error('onOutgoingMessage error') + } + + const { target, loggerSpy, client } = await createServices({ t, wsHooks: { onIncomingMessage, onOutgoingMessage } }) + + target.ws.on('connection', async (socket) => { + socket.on('message', async (data, binary) => { + socket.send(response, { binary }) + }) + }) + + client.send(request) + + await waitForLogMessage(loggerSpy, 'proxy ws error from onIncomingMessage hook') + await waitForLogMessage(loggerSpy, 'proxy ws error from onOutgoingMessage hook') +}) + +test('should call onConnect hook', async (t) => { + const onConnect = () => { + logger.info('onConnect called') + } + + const { loggerSpy, logger } = await createServices({ t, wsHooks: { onConnect } }) + + await waitForLogMessage(loggerSpy, 'onConnect called') +}) + +test('should handle throwing an error in onConnect hook', async (t) => { + const onConnect = () => { + throw new Error('onConnect error') + } + + const { loggerSpy } = await createServices({ t, wsHooks: { onConnect } }) + + await waitForLogMessage(loggerSpy, 'proxy ws error from onConnect hook') +}) + +test('should call onDisconnect hook', async (t) => { + const onDisconnect = () => { + logger.info('onDisconnect called') + } + + const { loggerSpy, logger, client } = await createServices({ t, wsHooks: { onDisconnect } }) + client.close() + + await waitForLogMessage(loggerSpy, 'onDisconnect called') +}) + +test('should handle throwing an error in onDisconnect hook', async (t) => { + const onDisconnect = () => { + throw new Error('onDisconnect error') + } + + const { loggerSpy, client } = await createServices({ t, wsHooks: { onDisconnect } }) + client.close() + + await waitForLogMessage(loggerSpy, 'proxy ws error from onDisconnect hook') +}) diff --git a/test/ws-reconnect.js b/test/ws-reconnect.js new file mode 100644 index 0000000..56f34b7 --- /dev/null +++ b/test/ws-reconnect.js @@ -0,0 +1,327 @@ +'use strict' + +const { test } = require('node:test') +const assert = require('node:assert') +const { setTimeout: wait } = require('node:timers/promises') +const { waitForLogMessage, createTargetServer, createServices } = require('./helper/helper') + +test('should use ping/pong to verify connection is alive - from source (server on proxy) to target', async (t) => { + const wsReconnectOptions = { pingInterval: 100, reconnectInterval: 100, maxReconnectionRetries: 1 } + + const { target } = await createServices({ t, wsReconnectOptions }) + + let counter = 0 + target.ws.on('connection', function connection (socket) { + socket.on('ping', () => { + counter++ + }) + }) + + await wait(250) + + assert.ok(counter > 0) +}) + +test('should reconnect on broken connection', async (t) => { + const wsReconnectOptions = { pingInterval: 500, reconnectInterval: 100, maxReconnectionRetries: 1, reconnectDecay: 2, logs: true } + + const { target, loggerSpy } = await createServices({ t, wsReconnectOptions, wsTargetOptions: { autoPong: false } }) + + let breakConnection = true + target.ws.on('connection', async (socket) => { + socket.on('ping', async () => { + // add latency to break the connection once + if (breakConnection) { + await wait(wsReconnectOptions.pingInterval * 2) + breakConnection = false + } + socket.pong() + }) + }) + + await waitForLogMessage(loggerSpy, 'proxy ws connection is broken') + await waitForLogMessage(loggerSpy, 'proxy ws target close event') + await waitForLogMessage(loggerSpy, 'proxy ws reconnected') +}) + +test('should not reconnect after max retries', async (t) => { + const wsReconnectOptions = { pingInterval: 150, reconnectInterval: 100, maxReconnectionRetries: 1, logs: true } + + const { target, loggerSpy } = await createServices({ t, wsReconnectOptions, wsTargetOptions: { autoPong: false } }) + + let breakConnection = true + + target.ws.on('connection', async (socket) => { + socket.on('ping', async () => { + // add latency to break the connection once + if (breakConnection) { + await wait(wsReconnectOptions.pingInterval * 2) + breakConnection = false + } + socket.pong() + }) + }) + + await waitForLogMessage(loggerSpy, 'proxy ws connection is broken') + + target.ws.close() + target.server.close() + + await waitForLogMessage(loggerSpy, 'proxy ws target close event') + await waitForLogMessage(loggerSpy, 'proxy ws reconnect error') + await waitForLogMessage(loggerSpy, 'proxy ws failed to reconnect! No more retries') +}) + +test('should not reconnect when the target connection is closed and reconnectOnClose is off', async (t) => { + const wsReconnectOptions = { pingInterval: 200, reconnectInterval: 100, maxReconnectionRetries: 1, reconnectOnClose: false, logs: true } + + const { target, loggerSpy } = await createServices({ t, wsReconnectOptions }) + + target.ws.on('connection', async (socket) => { + socket.on('ping', async () => { + socket.pong() + }) + + await wait(500) + socket.close() + }) + + await waitForLogMessage(loggerSpy, 'proxy ws target close event') + await waitForLogMessage(loggerSpy, 'proxy ws close link') +}) + +test('should reconnect retrying after a few failures', async (t) => { + const wsReconnectOptions = { pingInterval: 150, reconnectInterval: 100, reconnectDecay: 2, logs: true, maxReconnectionRetries: Infinity } + + const wsTargetOptions = { autoPong: false } + const { target, loggerSpy } = await createServices({ t, wsReconnectOptions, wsTargetOptions }) + + let breakConnection = true + + target.ws.on('connection', async (socket) => { + socket.on('ping', async () => { + // add latency to break the connection once + if (breakConnection) { + await wait(wsReconnectOptions.pingInterval * 2) + breakConnection = false + } + socket.pong() + }) + }) + + await waitForLogMessage(loggerSpy, 'proxy ws connection is broken') + + // recreate a new target + const targetPort = target.server.address().port + target.ws.close() + target.server.close() + + await waitForLogMessage(loggerSpy, 'proxy ws target close event') + // make reconnection fail 2 times + await waitForLogMessage(loggerSpy, 'proxy ws reconnect error') + await waitForLogMessage(loggerSpy, 'proxy ws reconnect in 200 ms') + + // recreate the target + await createTargetServer(t, { autoPong: true }, targetPort) + await waitForLogMessage(loggerSpy, 'proxy ws reconnected') +}) + +test('should reconnect when the target connection is closed gracefully and reconnectOnClose is on', async (t) => { + const wsReconnectOptions = { pingInterval: 200, reconnectInterval: 100, maxReconnectionRetries: 1, reconnectOnClose: true, logs: true } + + const { target, loggerSpy } = await createServices({ t, wsReconnectOptions }) + + target.ws.on('connection', async (socket) => { + socket.on('ping', async () => { + socket.pong() + }) + + await wait(500) + socket.close() + }) + + await waitForLogMessage(loggerSpy, 'proxy ws target close event') + await waitForLogMessage(loggerSpy, 'proxy ws reconnected') +}) + +test('should call onReconnect hook when the connection is reconnected', async (t) => { + const onReconnect = (source, target) => { + logger.info('onReconnect called') + } + const wsReconnectOptions = { + pingInterval: 100, + reconnectInterval: 100, + maxReconnectionRetries: 1, + reconnectOnClose: true, + logs: true, + } + + const { target, loggerSpy, logger } = await createServices({ t, wsReconnectOptions, wsHooks: { onReconnect } }) + + target.ws.on('connection', async (socket) => { + socket.on('ping', async () => { + socket.pong() + }) + + await wait(500) + socket.close() + }) + + await waitForLogMessage(loggerSpy, 'proxy ws target close event') + await waitForLogMessage(loggerSpy, 'proxy ws reconnected') + await waitForLogMessage(loggerSpy, 'onReconnect called') +}) + +test('should handle throwing an error in onReconnect hook', async (t) => { + const onReconnect = (source, target) => { + throw new Error('onReconnect error') + } + const wsReconnectOptions = { + pingInterval: 100, + reconnectInterval: 100, + maxReconnectionRetries: 1, + reconnectOnClose: true, + logs: true, + } + + const { target, loggerSpy } = await createServices({ t, wsReconnectOptions, wsHooks: { onReconnect } }) + + target.ws.on('connection', async (socket) => { + socket.on('ping', async () => { + socket.pong() + }) + + await wait(500) + socket.close() + }) + + await waitForLogMessage(loggerSpy, 'proxy ws target close event') + await waitForLogMessage(loggerSpy, 'proxy ws reconnected') + await waitForLogMessage(loggerSpy, 'proxy ws error from onReconnect hook') +}) + +test('should call onIncomingMessage and onOutgoingMessage hooks, with reconnection', async (t) => { + const request = 'query () { ... }' + const response = 'data ...' + const onIncomingMessage = (source, target, { data, binary }) => { + assert.strictEqual(data.toString(), request) + assert.strictEqual(binary, false) + logger.info('onIncomingMessage called') + } + const onOutgoingMessage = (source, target, { data, binary }) => { + assert.strictEqual(data.toString(), response) + assert.strictEqual(binary, false) + logger.info('onOutgoingMessage called') + } + const wsReconnectOptions = { + pingInterval: 100, + reconnectInterval: 100, + maxReconnectionRetries: 1, + logs: true, + } + + const { target, loggerSpy, logger, client } = await createServices({ t, wsReconnectOptions, wsHooks: { onIncomingMessage, onOutgoingMessage } }) + + target.ws.on('connection', async (socket) => { + socket.on('message', async (data, binary) => { + socket.send(response, { binary }) + }) + }) + + client.send(request) + + await waitForLogMessage(loggerSpy, 'onIncomingMessage called') + await waitForLogMessage(loggerSpy, 'onOutgoingMessage called') +}) + +test('should handle throwing an error in onIncomingMessage and onOutgoingMessage hooks, with reconnection', async (t) => { + const request = 'query () { ... }' + const response = 'data ...' + const onIncomingMessage = ({ data, binary }) => { + assert.strictEqual(data.toString(), request) + assert.strictEqual(binary, false) + throw new Error('onIncomingMessage error') + } + const onOutgoingMessage = ({ data, binary }) => { + assert.strictEqual(data.toString(), response) + assert.strictEqual(binary, false) + throw new Error('onOutgoingMessage error') + } + const wsReconnectOptions = { + pingInterval: 100, + reconnectInterval: 100, + maxReconnectionRetries: 1, + logs: true, + } + + const { target, loggerSpy, client } = await createServices({ t, wsReconnectOptions, wsHooks: { onIncomingMessage, onOutgoingMessage } }) + + target.ws.on('connection', async (socket) => { + socket.on('message', async (data, binary) => { + socket.send(response, { binary }) + }) + }) + + client.send(request) + + await waitForLogMessage(loggerSpy, 'proxy ws error from onIncomingMessage hook') + await waitForLogMessage(loggerSpy, 'proxy ws error from onOutgoingMessage hook') +}) + +test('should call onConnect hook', async (t) => { + const onConnect = () => { + logger.info('onConnect called') + } + + const wsReconnectOptions = { + logs: true, + } + + const { loggerSpy, logger } = await createServices({ t, wsReconnectOptions, wsHooks: { onConnect } }) + + await waitForLogMessage(loggerSpy, 'onConnect called') +}) + +test('should handle throwing an error in onConnect hook', async (t) => { + const onConnect = () => { + throw new Error('onConnect error') + } + + const wsReconnectOptions = { + logs: true, + } + + const { loggerSpy } = await createServices({ t, wsReconnectOptions, wsHooks: { onConnect } }) + + await waitForLogMessage(loggerSpy, 'proxy ws error from onConnect hook') +}) + +test('should call onDisconnect hook', async (t) => { + const onDisconnect = () => { + logger.info('onDisconnect called') + } + + const wsReconnectOptions = { + logs: true, + } + + const { loggerSpy, logger, client } = await createServices({ t, wsReconnectOptions, wsHooks: { onDisconnect } }) + client.close() + + await waitForLogMessage(loggerSpy, 'onDisconnect called') +}) + +test('should handle throwing an error in onDisconnect hook', async (t) => { + const onDisconnect = () => { + throw new Error('onDisconnect error') + } + + const wsReconnectOptions = { + logs: true, + } + + const { loggerSpy, client } = await createServices({ t, wsReconnectOptions, wsHooks: { onDisconnect } }) + client.close() + + await waitForLogMessage(loggerSpy, 'proxy ws error from onDisconnect hook') +})