diff --git a/src/encoders/json_encoder.ts b/src/encoders/json_encoder.ts index a537e81..a30a070 100644 --- a/src/encoders/json_encoder.ts +++ b/src/encoders/json_encoder.ts @@ -12,7 +12,7 @@ export class JsonEncoder implements TransportEncoder { return JSON.stringify(message) } - decode(data: string) { - return JSON.parse(data) + decode(data: string | Buffer) { + return JSON.parse(data.toString()) } } diff --git a/src/transports/redis.ts b/src/transports/redis.ts index c3cea8d..c8c04c9 100644 --- a/src/transports/redis.ts +++ b/src/transports/redis.ts @@ -5,8 +5,9 @@ * @copyright Boring Node */ +import { Redis } from 'ioredis' import { assert } from '@poppinss/utils/assert' -import { Redis, type RedisOptions } from 'ioredis' + import debug from '../debug.js' import { JsonEncoder } from '../encoders/json_encoder.js' import type { @@ -26,18 +27,22 @@ export class RedisTransport implements Transport { readonly #publisher: Redis readonly #subscriber: Redis readonly #encoder: TransportEncoder + readonly #useMessageBuffer: boolean = false #id: string | undefined constructor(path: string, encoder?: TransportEncoder) - constructor(options: RedisOptions, encoder?: TransportEncoder) - constructor(options: RedisOptions | string, encoder?: TransportEncoder) - constructor(options: RedisOptions | string, encoder?: TransportEncoder) { + constructor(options: RedisTransportConfig, encoder?: TransportEncoder) + constructor(options: RedisTransportConfig | string, encoder?: TransportEncoder) { // @ts-expect-error - merged definitions of overloaded constructor is not public this.#publisher = new Redis(options) // @ts-expect-error - merged definitions of overloaded constructor is not public this.#subscriber = new Redis(options) this.#encoder = encoder ?? new JsonEncoder() + + if (typeof options === 'object') { + this.#useMessageBuffer = options.useMessageBuffer ?? false + } } setId(id: string): Transport { @@ -69,7 +74,10 @@ export class RedisTransport implements Transport { } }) - this.#subscriber.on('message', (receivedChannel, message) => { + const event = this.#useMessageBuffer ? 'messageBuffer' : 'message' + this.#subscriber.on(event, (receivedChannel: Buffer | string, message: Buffer | string) => { + receivedChannel = receivedChannel.toString() + if (channel !== receivedChannel) return debug('received message for channel "%s"', channel) diff --git a/src/types/main.ts b/src/types/main.ts index 42f01b8..ee22ecd 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -27,7 +27,14 @@ export interface TransportConfig { retryQueue?: RetryQueueOptions } -export interface RedisTransportConfig extends RedisOptions {} +export interface RedisTransportConfig extends RedisOptions { + /** + * If true, we will use `messageBuffer` event instead of `message` event + * that is emitted by ioredis. `messageBuffer` will returns a buffer instead + * of a string and this is useful when you are dealing with binary data. + */ + useMessageBuffer?: boolean +} export interface Transport { setId: (id: string) => Transport @@ -48,7 +55,7 @@ export interface TransportMessage { export interface TransportEncoder { encode: (message: TransportMessage) => string | Buffer - decode: (data: string) => { busId: string; payload: T } + decode: (data: string | Buffer) => { busId: string; payload: T } } export interface RetryQueueOptions { diff --git a/tests/drivers/redis_transport.spec.ts b/tests/drivers/redis_transport.spec.ts index 499dc90..664fcf8 100644 --- a/tests/drivers/redis_transport.spec.ts +++ b/tests/drivers/redis_transport.spec.ts @@ -10,6 +10,7 @@ import { test } from '@japa/runner' import { RedisContainer, StartedRedisContainer } from '@testcontainers/redis' import { RedisTransport } from '../../src/transports/redis.js' import { JsonEncoder } from '../../src/encoders/json_encoder.js' +import { TransportEncoder, TransportMessage } from '../../src/types/main.js' test.group('Redis Transport', (group) => { let container: StartedRedisContainer @@ -95,4 +96,44 @@ test.group('Redis Transport', (group) => { await transport.publish('testing-channel', data) }) + + test('send binary data using useMessageBuffer', async ({ assert, cleanup }, done) => { + assert.plan(1) + + class BinaryEncoder implements TransportEncoder { + encode(message: TransportMessage) { + return Buffer.from(JSON.stringify(message)) + } + + decode(data: string | Buffer) { + const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data, 'binary') + return JSON.parse(buffer.toString()) + } + } + + const transport1 = new RedisTransport( + { host: container.getHost(), port: container.getMappedPort(6379), useMessageBuffer: true }, + new BinaryEncoder() + ).setId('bus1') + + const transport2 = new RedisTransport( + { host: container.getHost(), port: container.getMappedPort(6379), useMessageBuffer: true }, + new BinaryEncoder() + ).setId('bus2') + + cleanup(() => { + transport1.disconnect() + transport2.disconnect() + }) + + const data = ['foo', '👍'] + + await transport1.subscribe('testing-channel', (payload) => { + assert.deepEqual(payload, data) + done() + }) + + await setTimeout(200) + await transport2.publish('testing-channel', data) + }).waitForDone() })