From 8226b62f6df4212aca3418bdbc7d1249fc62c0c2 Mon Sep 17 00:00:00 2001 From: MaximeMRF Date: Sat, 25 May 2024 16:10:18 +0800 Subject: [PATCH 1/5] feat(transport): add mqtt --- package.json | 2 + src/transports/mqtt.ts | 95 +++++++++++++++++++ src/types/main.ts | 18 ++++ tests/drivers/mqtt_transport.spec.ts | 134 +++++++++++++++++++++++++++ 4 files changed, 249 insertions(+) create mode 100644 src/transports/mqtt.ts create mode 100644 tests/drivers/mqtt_transport.spec.ts diff --git a/package.json b/package.json index 399fc23..2dce5b2 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "@japa/expect-type": "^2.0.2", "@japa/runner": "^3.1.4", "@swc/core": "^1.5.7", + "@testcontainers/hivemq": "^10.9.0", "@testcontainers/redis": "^10.9.0", "@types/node": "^20.12.12", "@types/object-hash": "^3.0.6", @@ -43,6 +44,7 @@ "del-cli": "^5.1.0", "eslint": "^8.57.0", "ioredis": "^5.4.1", + "mqtt": "^5.6.1", "prettier": "^3.2.5", "release-it": "^17.2.1", "ts-node": "^10.9.2", diff --git a/src/transports/mqtt.ts b/src/transports/mqtt.ts new file mode 100644 index 0000000..23da354 --- /dev/null +++ b/src/transports/mqtt.ts @@ -0,0 +1,95 @@ +/** + * @boringnode/bus + * + * @license MIT + * @copyright Boring Node + */ + +import Mqtt from 'mqtt' +import { assert } from '@poppinss/utils/assert' + +import debug from '../debug.js' +import { + Transport, + TransportEncoder, + TransportMessage, + Serializable, + SubscribeHandler, + MqttProtocol, + MqttTransportConfig, +} from '../types/main.js' +import { JsonEncoder } from '../encoders/json_encoder.js' + +export function mqtt(config: MqttTransportConfig) { + return () => new MqttTransport(config) +} + +export class MqttTransport implements Transport { + #id: string | undefined + #client: any + #url: string + readonly #encoder: TransportEncoder + + constructor(options: MqttTransportConfig, encoder?: TransportEncoder) { + this.#encoder = encoder ?? new JsonEncoder() + this.#url = `${options.protocol || MqttProtocol.MQTT}://${options.host}${options.port ? `:${options.port}` : ''}` + + this.#client = Mqtt.connect(this.#url) + } + + setId(id: string): Transport { + this.#id = id + + return this + } + + async disconnect(): Promise { + await this.#client.endAsync() + } + + async publish(channel: string, message: any): Promise { + assert(this.#id, 'You must set an id before publishing a message') + + const encoded = this.#encoder.encode({ payload: message, busId: this.#id }) + + await this.#client.publishAsync(channel, encoded) + } + + async subscribe( + channel: string, + handler: SubscribeHandler + ): Promise { + this.#client.subscribe(channel, (err) => { + if (err) { + throw err + } + }) + + this.#client.on('message', (receivedChannel: string, message: Buffer | string) => { + if (channel !== receivedChannel) return + + debug('received message for channel "%s"', channel) + + const data = this.#encoder.decode>(message) + + /** + * Ignore messages published by this bus instance + */ + if (data.busId === this.#id) { + debug('ignoring message published by the same bus instance') + return + } + + // @ts-expect-error - TODO: Weird typing issue + handler(data.payload) + }) + } + + onReconnect(): void { + this.#client.reconnect(this.#url) + } + + async unsubscribe(channel: string): Promise { + await this.#client.unsubscribeAsync(channel) + } +} diff --git a/src/types/main.ts b/src/types/main.ts index ee22ecd..8e4c3d8 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -36,6 +36,24 @@ export interface RedisTransportConfig extends RedisOptions { useMessageBuffer?: boolean } +export enum MqttProtocol { + MQTT = 'mqtt', + MQTTS = 'mqtts', + TCP = 'tcp', + TLS = 'tls', + WS = 'ws', + WSS = 'wss', + WXS = 'wxs', + ALIS = 'alis', +} + +export interface MqttTransportConfig { + host: string + port?: number + protocol?: MqttProtocol + qos?: number +} + export interface Transport { setId: (id: string) => Transport onReconnect: (callback: () => void) => void diff --git a/tests/drivers/mqtt_transport.spec.ts b/tests/drivers/mqtt_transport.spec.ts new file mode 100644 index 0000000..6c023e0 --- /dev/null +++ b/tests/drivers/mqtt_transport.spec.ts @@ -0,0 +1,134 @@ +/** + * @boringnode/bus + * + * @license MIT + * @copyright Boring Node + */ + +import { setTimeout } from 'node:timers/promises' +import { test } from '@japa/runner' +import { HiveMQContainer, StartedHiveMQContainer } from '@testcontainers/hivemq' +import { MqttTransport } from '../../src/transports/mqtt.js' +import { JsonEncoder } from '../../src/encoders/json_encoder.js' +import { TransportEncoder, TransportMessage } from '../../src/types/main.js' + +test.group('Mqtt Transport', (group) => { + let container: StartedHiveMQContainer + + group.setup(async () => { + container = await new HiveMQContainer().start() + + return async () => { + await container.stop() + } + }) + + test('transport should not receive message emitted by itself', async ({ assert, cleanup }) => { + const transport = new MqttTransport({ + host: container.getHost(), + port: container.getPort(), + }).setId('bus') + cleanup(() => transport.disconnect()) + + await transport.subscribe('testing-channel', () => { + assert.fail('Bus should not receive message emitted by itself') + }) + + await transport.publish('testing-channel', 'test') + await setTimeout(1000) + }).disableTimeout() + + test('transport should receive message emitted by another bus', async ({ + assert, + cleanup, + }, done) => { + assert.plan(1) + + const transport1 = new MqttTransport({ + host: container.getHost(), + port: container.getPort(), + }).setId('bus1') + const transport2 = new MqttTransport({ + host: container.getHost(), + port: container.getPort(), + }).setId('bus2') + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + await transport1.subscribe('testing-channel', (payload) => { + assert.equal(payload, 'test') + done() + }) + + await setTimeout(200) + + await transport2.publish('testing-channel', 'test') + }).waitForDone() + + test('message should be encoded and decoded correctly when using JSON encoder', async ({ + assert, + cleanup, + }) => { + const transport = new MqttTransport( + { + host: container.getHost(), + port: container.getPort(), + }, + new JsonEncoder() + ).setId('bus') + cleanup(() => transport.disconnect()) + + const data = { test: 'test' } + + await transport.subscribe('testing-channel', (payload) => { + assert.deepEqual(payload, data) + }) + + await setTimeout(200) + + await transport.publish('testing-channel', data) + }) + + test('send binary data', async ({ assert, cleanup }, done) => { + assert.plan(1) + + class BinaryEncoder implements TransportEncoder { + encode(message: TransportMessage) { + return Buffer.from(JSON.stringify(message)) + } + + decode(data: string | Buffer) { + const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data, 'binary') + return JSON.parse(buffer.toString()) + } + } + + const transport1 = new MqttTransport( + { host: container.getHost(), port: container.getMappedPort(1883) }, + new BinaryEncoder() + ).setId('bus1') + + const transport2 = new MqttTransport( + { host: container.getHost(), port: container.getMappedPort(1883) }, + new BinaryEncoder() + ).setId('bus2') + + cleanup(() => { + transport1.disconnect() + transport2.disconnect() + }) + + const data = ['foo', '👍'] + + await transport1.subscribe('testing-channel', (payload) => { + assert.deepEqual(payload, data) + done() + }) + + await setTimeout(200) + await transport2.publish('testing-channel', data) + }).waitForDone() +}) From ea3608fdcca98b430d6e33992d7c51ddfc19ce22 Mon Sep 17 00:00:00 2001 From: MaximeMRF Date: Sat, 25 May 2024 16:44:44 +0800 Subject: [PATCH 2/5] refacto(mqtt): handle options --- src/transports/mqtt.ts | 10 +++++----- src/types/main.ts | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/transports/mqtt.ts b/src/transports/mqtt.ts index 23da354..e92c192 100644 --- a/src/transports/mqtt.ts +++ b/src/transports/mqtt.ts @@ -5,7 +5,7 @@ * @copyright Boring Node */ -import Mqtt from 'mqtt' +import { connect, IClientOptions, MqttClient } from 'mqtt' import { assert } from '@poppinss/utils/assert' import debug from '../debug.js' @@ -26,15 +26,15 @@ export function mqtt(config: MqttTransportConfig) { export class MqttTransport implements Transport { #id: string | undefined - #client: any + #client: MqttClient #url: string readonly #encoder: TransportEncoder - constructor(options: MqttTransportConfig, encoder?: TransportEncoder) { + constructor(config: MqttTransportConfig, encoder?: TransportEncoder) { this.#encoder = encoder ?? new JsonEncoder() - this.#url = `${options.protocol || MqttProtocol.MQTT}://${options.host}${options.port ? `:${options.port}` : ''}` + this.#url = `${config.protocol || MqttProtocol.MQTT}://${config.host}${config.port ? `:${config.port}` : ''}` - this.#client = Mqtt.connect(this.#url) + this.#client = connect(this.#url, config.options ?? {}) } setId(id: string): Transport { diff --git a/src/types/main.ts b/src/types/main.ts index 8e4c3d8..fb8099e 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -6,6 +6,7 @@ */ import type { RedisOptions } from 'ioredis' +import type { IClientOptions } from 'mqtt' export type TransportFactory = () => Transport /** @@ -51,7 +52,7 @@ export interface MqttTransportConfig { host: string port?: number protocol?: MqttProtocol - qos?: number + options?: IClientOptions } export interface Transport { From a6e208908a3690d69ae88a9e74ed5742f82d0f74 Mon Sep 17 00:00:00 2001 From: MaximeMRF Date: Wed, 14 Aug 2024 22:42:49 +0800 Subject: [PATCH 3/5] test(redis): wait the test to finish properly --- tests/drivers/redis_transport.spec.ts | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/tests/drivers/redis_transport.spec.ts b/tests/drivers/redis_transport.spec.ts index 664fcf8..22b8b57 100644 --- a/tests/drivers/redis_transport.spec.ts +++ b/tests/drivers/redis_transport.spec.ts @@ -80,22 +80,28 @@ test.group('Redis Transport', (group) => { test('message should be encoded and decoded correctly when using JSON encoder', async ({ assert, cleanup, - }) => { - const transport = new RedisTransport(container.getConnectionUrl(), new JsonEncoder()).setId( - 'bus' - ) - cleanup(() => transport.disconnect()) + }, done) => { + assert.plan(1) + + const transport1 = new RedisTransport(container.getConnectionUrl()).setId('bus1') + const transport2 = new RedisTransport(container.getConnectionUrl()).setId('bus2') + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) const data = { test: 'test' } - await transport.subscribe('testing-channel', (payload) => { + await transport1.subscribe('testing-channel', (payload) => { assert.deepEqual(payload, data) + done() }) await setTimeout(200) - await transport.publish('testing-channel', data) - }) + await transport2.publish('testing-channel', data) + }).waitForDone() test('send binary data using useMessageBuffer', async ({ assert, cleanup }, done) => { assert.plan(1) From 33b01e19425f6b8af7b60da7169e0acc03817174 Mon Sep 17 00:00:00 2001 From: MaximeMRF Date: Thu, 15 Aug 2024 01:05:26 +0800 Subject: [PATCH 4/5] tests(mqtt): correct tests and add two broker --- config/mosquitto.conf | 2 + package.json | 1 + src/transports/mqtt.ts | 8 +- tests/drivers/mqtt_transport.spec.ts | 331 ++++++++++++++++++++++++-- tests/drivers/redis_transport.spec.ts | 8 +- 5 files changed, 322 insertions(+), 28 deletions(-) create mode 100644 config/mosquitto.conf diff --git a/config/mosquitto.conf b/config/mosquitto.conf new file mode 100644 index 0000000..6dc5917 --- /dev/null +++ b/config/mosquitto.conf @@ -0,0 +1,2 @@ +listener 1883 0.0.0.0 +allow_anonymous true diff --git a/package.json b/package.json index 2dce5b2..c6c885e 100644 --- a/package.json +++ b/package.json @@ -47,6 +47,7 @@ "mqtt": "^5.6.1", "prettier": "^3.2.5", "release-it": "^17.2.1", + "testcontainers": "^10.9.0", "ts-node": "^10.9.2", "tsup": "^8.0.2", "typescript": "^5.4.5" diff --git a/src/transports/mqtt.ts b/src/transports/mqtt.ts index e92c192..b6a0f62 100644 --- a/src/transports/mqtt.ts +++ b/src/transports/mqtt.ts @@ -5,7 +5,7 @@ * @copyright Boring Node */ -import { connect, IClientOptions, MqttClient } from 'mqtt' +import { connect, MqttClient } from 'mqtt' import { assert } from '@poppinss/utils/assert' import debug from '../debug.js' @@ -20,8 +20,8 @@ import { } from '../types/main.js' import { JsonEncoder } from '../encoders/json_encoder.js' -export function mqtt(config: MqttTransportConfig) { - return () => new MqttTransport(config) +export function mqtt(config: MqttTransportConfig, encoder?: TransportEncoder) { + return () => new MqttTransport(config, encoder) } export class MqttTransport implements Transport { @@ -86,7 +86,7 @@ export class MqttTransport implements Transport { } onReconnect(): void { - this.#client.reconnect(this.#url) + this.#client.reconnect() } async unsubscribe(channel: string): Promise { diff --git a/tests/drivers/mqtt_transport.spec.ts b/tests/drivers/mqtt_transport.spec.ts index 6c023e0..effa71e 100644 --- a/tests/drivers/mqtt_transport.spec.ts +++ b/tests/drivers/mqtt_transport.spec.ts @@ -8,25 +8,51 @@ import { setTimeout } from 'node:timers/promises' import { test } from '@japa/runner' import { HiveMQContainer, StartedHiveMQContainer } from '@testcontainers/hivemq' +import { GenericContainer, StartedTestContainer } from 'testcontainers' import { MqttTransport } from '../../src/transports/mqtt.js' import { JsonEncoder } from '../../src/encoders/json_encoder.js' import { TransportEncoder, TransportMessage } from '../../src/types/main.js' test.group('Mqtt Transport', (group) => { - let container: StartedHiveMQContainer + let hiveMqContainer: StartedHiveMQContainer + let emqxContainer: StartedTestContainer + let mosquittoContainer: StartedTestContainer group.setup(async () => { - container = await new HiveMQContainer().start() + hiveMqContainer = await new HiveMQContainer() + .withExposedPorts({ + container: 1883, + host: 1884, + }) + .start() + emqxContainer = await new GenericContainer('emqx/emqx').withExposedPorts(1883).start() + mosquittoContainer = await new GenericContainer('eclipse-mosquitto') + .withExposedPorts({ + container: 1883, + host: 1885, + }) + .withCopyFilesToContainer([ + { + source: './config/mosquitto.conf', + target: '/mosquitto/config/mosquitto.conf', + }, + ]) + .start() return async () => { - await container.stop() + await hiveMqContainer.stop() + await emqxContainer.stop() + await mosquittoContainer.stop() } }) - test('transport should not receive message emitted by itself', async ({ assert, cleanup }) => { + test('HiveMQ transport should not receive message emitted by itself', async ({ + assert, + cleanup, + }) => { const transport = new MqttTransport({ - host: container.getHost(), - port: container.getPort(), + host: hiveMqContainer.getHost(), + port: hiveMqContainer.getPort(), }).setId('bus') cleanup(() => transport.disconnect()) @@ -38,19 +64,19 @@ test.group('Mqtt Transport', (group) => { await setTimeout(1000) }).disableTimeout() - test('transport should receive message emitted by another bus', async ({ + test('HiveMQ transport should receive message emitted by another bus', async ({ assert, cleanup, }, done) => { assert.plan(1) const transport1 = new MqttTransport({ - host: container.getHost(), - port: container.getPort(), + host: hiveMqContainer.getHost(), + port: hiveMqContainer.getPort(), }).setId('bus1') const transport2 = new MqttTransport({ - host: container.getHost(), - port: container.getPort(), + host: hiveMqContainer.getHost(), + port: hiveMqContainer.getPort(), }).setId('bus2') cleanup(async () => { @@ -68,31 +94,292 @@ test.group('Mqtt Transport', (group) => { await transport2.publish('testing-channel', 'test') }).waitForDone() - test('message should be encoded and decoded correctly when using JSON encoder', async ({ + test('HiveMQ message should be encoded and decoded correctly when using JSON encoder', async ({ + assert, + cleanup, + }, done) => { + assert.plan(1) + const transport1 = new MqttTransport( + { + host: hiveMqContainer.getHost(), + port: hiveMqContainer.getPort(), + }, + new JsonEncoder() + ).setId('bus1') + const transport2 = new MqttTransport( + { + host: hiveMqContainer.getHost(), + port: hiveMqContainer.getPort(), + }, + new JsonEncoder() + ).setId('bus2') + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + const data = { test: 'test' } + + await transport1.subscribe('testing-channel', (payload) => { + assert.deepEqual(payload, data) + done() + }) + + await setTimeout(200) + + await transport2.publish('testing-channel', data) + }).waitForDone() + + test('HiveMQ send binary data', async ({ assert, cleanup }, done) => { + assert.plan(1) + + class BinaryEncoder implements TransportEncoder { + encode(message: TransportMessage) { + return Buffer.from(JSON.stringify(message)) + } + + decode(data: string | Buffer) { + const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data, 'binary') + return JSON.parse(buffer.toString()) + } + } + + const transport1 = new MqttTransport( + { host: hiveMqContainer.getHost(), port: hiveMqContainer.getPort() }, + new BinaryEncoder() + ).setId('bus1') + + const transport2 = new MqttTransport( + { host: hiveMqContainer.getHost(), port: hiveMqContainer.getPort() }, + new BinaryEncoder() + ).setId('bus2') + + cleanup(() => { + transport1.disconnect() + transport2.disconnect() + }) + + const data = ['foo', '👍'] + + await transport1.subscribe('testing-channel', (payload) => { + assert.deepEqual(payload, data) + done() + }) + + await setTimeout(200) + await transport2.publish('testing-channel', data) + }).waitForDone() + + test('EMQX transport should not receive message emitted by itself', async ({ assert, cleanup, }) => { - const transport = new MqttTransport( + const transport = new MqttTransport({ + host: emqxContainer.getHost(), + port: emqxContainer.getMappedPort(1883), + }).setId('bus') + cleanup(() => transport.disconnect()) + + await transport.subscribe('testing-channel', () => { + assert.fail('Bus should not receive message emitted by itself') + }) + + await transport.publish('testing-channel', 'test') + await setTimeout(1000) + }).disableTimeout() + + test('EMQX transport should receive message emitted by another bus', async ({ + assert, + cleanup, + }, done) => { + assert.plan(1) + + const transport1 = new MqttTransport({ + host: emqxContainer.getHost(), + port: emqxContainer.getMappedPort(1883), + }).setId('bus1') + const transport2 = new MqttTransport({ + host: emqxContainer.getHost(), + port: emqxContainer.getMappedPort(1883), + }).setId('bus2') + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + await transport1.subscribe('testing-channel', (payload) => { + assert.equal(payload, 'test') + done() + }) + + await setTimeout(200) + + await transport2.publish('testing-channel', 'test') + }).waitForDone() + + test('EMQX message should be encoded and decoded correctly when using JSON encoder', async ({ + assert, + cleanup, + }, done) => { + assert.plan(1) + const transport1 = new MqttTransport( + { + host: emqxContainer.getHost(), + port: emqxContainer.getMappedPort(1883), + }, + new JsonEncoder() + ).setId('bus1') + const transport2 = new MqttTransport( { - host: container.getHost(), - port: container.getPort(), + host: emqxContainer.getHost(), + port: emqxContainer.getMappedPort(1883), }, new JsonEncoder() - ).setId('bus') + ).setId('bus2') + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + const data = { test: 'test' } + + await transport1.subscribe('testing-channel', (payload) => { + assert.deepEqual(payload, data) + done() + }) + + await setTimeout(200) + + await transport2.publish('testing-channel', data) + }).waitForDone() + + test('EMQX send binary data', async ({ assert, cleanup }, done) => { + assert.plan(1) + + class BinaryEncoder implements TransportEncoder { + encode(message: TransportMessage) { + return Buffer.from(JSON.stringify(message)) + } + + decode(data: string | Buffer) { + const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data, 'binary') + return JSON.parse(buffer.toString()) + } + } + + const transport1 = new MqttTransport( + { host: emqxContainer.getHost(), port: emqxContainer.getMappedPort(1883) }, + new BinaryEncoder() + ).setId('bus1') + + const transport2 = new MqttTransport( + { host: emqxContainer.getHost(), port: emqxContainer.getMappedPort(1883) }, + new BinaryEncoder() + ).setId('bus2') + + cleanup(() => { + transport1.disconnect() + transport2.disconnect() + }) + + const data = ['foo', '👍'] + + await transport1.subscribe('testing-channel', (payload) => { + assert.deepEqual(payload, data) + done() + }) + + await setTimeout(200) + await transport2.publish('testing-channel', data) + }).waitForDone() + + test('Mosquitto transport should not receive message emitted by itself', async ({ + assert, + cleanup, + }) => { + const transport = new MqttTransport({ + host: mosquittoContainer.getHost(), + port: mosquittoContainer.getMappedPort(1883), + }).setId('bus') cleanup(() => transport.disconnect()) + await transport.subscribe('testing-channel', () => { + assert.fail('Bus should not receive message emitted by itself') + }) + + await transport.publish('testing-channel', 'test') + await setTimeout(1000) + }).disableTimeout() + + test('Mosquitto transport should receive message emitted by another bus', async ({ + assert, + cleanup, + }, done) => { + assert.plan(1) + + const transport1 = new MqttTransport({ + host: mosquittoContainer.getHost(), + port: mosquittoContainer.getMappedPort(1883), + }).setId('bus1') + const transport2 = new MqttTransport({ + host: mosquittoContainer.getHost(), + port: mosquittoContainer.getMappedPort(1883), + }).setId('bus2') + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + await transport1.subscribe('testing-channel', (payload) => { + assert.equal(payload, 'test') + done() + }) + + await setTimeout(200) + + await transport2.publish('testing-channel', 'test') + }).waitForDone() + + test('Mosquitto message should be encoded and decoded correctly when using JSON encoder', async ({ + assert, + cleanup, + }, done) => { + assert.plan(1) + const transport1 = new MqttTransport( + { + host: mosquittoContainer.getHost(), + port: mosquittoContainer.getMappedPort(1883), + }, + new JsonEncoder() + ).setId('bus1') + const transport2 = new MqttTransport( + { + host: mosquittoContainer.getHost(), + port: mosquittoContainer.getMappedPort(1883), + }, + new JsonEncoder() + ).setId('bus2') + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + const data = { test: 'test' } - await transport.subscribe('testing-channel', (payload) => { + await transport1.subscribe('testing-channel', (payload) => { assert.deepEqual(payload, data) + done() }) await setTimeout(200) - await transport.publish('testing-channel', data) - }) + await transport2.publish('testing-channel', data) + }).waitForDone() - test('send binary data', async ({ assert, cleanup }, done) => { + test('Mosquitto send binary data', async ({ assert, cleanup }, done) => { assert.plan(1) class BinaryEncoder implements TransportEncoder { @@ -107,12 +394,12 @@ test.group('Mqtt Transport', (group) => { } const transport1 = new MqttTransport( - { host: container.getHost(), port: container.getMappedPort(1883) }, + { host: mosquittoContainer.getHost(), port: mosquittoContainer.getMappedPort(1883) }, new BinaryEncoder() ).setId('bus1') const transport2 = new MqttTransport( - { host: container.getHost(), port: container.getMappedPort(1883) }, + { host: mosquittoContainer.getHost(), port: mosquittoContainer.getMappedPort(1883) }, new BinaryEncoder() ).setId('bus2') diff --git a/tests/drivers/redis_transport.spec.ts b/tests/drivers/redis_transport.spec.ts index 22b8b57..9e32f28 100644 --- a/tests/drivers/redis_transport.spec.ts +++ b/tests/drivers/redis_transport.spec.ts @@ -83,8 +83,12 @@ test.group('Redis Transport', (group) => { }, done) => { assert.plan(1) - const transport1 = new RedisTransport(container.getConnectionUrl()).setId('bus1') - const transport2 = new RedisTransport(container.getConnectionUrl()).setId('bus2') + const transport1 = new RedisTransport(container.getConnectionUrl(), new JsonEncoder()).setId( + 'bus1' + ) + const transport2 = new RedisTransport(container.getConnectionUrl(), new JsonEncoder()).setId( + 'bus2' + ) cleanup(async () => { await transport1.disconnect() From 675695d5bed7f8e11c85e707e3b246a9d5d728ca Mon Sep 17 00:00:00 2001 From: MaximeMRF Date: Wed, 2 Oct 2024 16:22:26 +0800 Subject: [PATCH 5/5] docs(mqtt) --- README.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 5d00147..c0d9816 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ Currently, it supports the following transports:

👉 Memory: A simple in-memory transport for testing purposes.
👉 Redis: A Redis transport for production usage. +👉 Mqtt: A Mqtt transport for production usage.

## Table of Contents @@ -47,6 +48,7 @@ The module exposes a manager that can be used to register buses. ```typescript import { BusManager } from '@boringnode/bus' import { redis } from '@boringnode/bus/transports/redis' +import { mqtt } from '@boringnode/bus/transports/mqtt' import { memory } from '@boringnode/bus/transports/memory' const manager = new BusManager({ @@ -60,7 +62,13 @@ const manager = new BusManager({ host: 'localhost', port: 6379, }), - } + }, + mqtt: { + transport: mqtt({ + host: 'localhost', + port: 1883, + }), + }, } }) ``` @@ -79,6 +87,7 @@ By default, the bus will use the `default` transport. You can specify different ```typescript manager.use('redis').publish('channel', 'Hello world') +manager.use('mqtt').publish('channel', 'Hello world') ``` ### Without the manager