diff --git a/README.md b/README.md
index 5d00147..c0d9816 100644
--- a/README.md
+++ b/README.md
@@ -21,6 +21,7 @@ Currently, it supports the following transports:
👉 Memory: A simple in-memory transport for testing purposes.
👉 Redis: A Redis transport for production usage.
+👉 Mqtt: A Mqtt transport for production usage.
## Table of Contents
@@ -47,6 +48,7 @@ The module exposes a manager that can be used to register buses.
```typescript
import { BusManager } from '@boringnode/bus'
import { redis } from '@boringnode/bus/transports/redis'
+import { mqtt } from '@boringnode/bus/transports/mqtt'
import { memory } from '@boringnode/bus/transports/memory'
const manager = new BusManager({
@@ -60,7 +62,13 @@ const manager = new BusManager({
host: 'localhost',
port: 6379,
}),
- }
+ },
+ mqtt: {
+ transport: mqtt({
+ host: 'localhost',
+ port: 1883,
+ }),
+ },
}
})
```
@@ -79,6 +87,7 @@ By default, the bus will use the `default` transport. You can specify different
```typescript
manager.use('redis').publish('channel', 'Hello world')
+manager.use('mqtt').publish('channel', 'Hello world')
```
### Without the manager
diff --git a/config/mosquitto.conf b/config/mosquitto.conf
new file mode 100644
index 0000000..6dc5917
--- /dev/null
+++ b/config/mosquitto.conf
@@ -0,0 +1,2 @@
+listener 1883 0.0.0.0
+allow_anonymous true
diff --git a/package.json b/package.json
index 399fc23..c6c885e 100644
--- a/package.json
+++ b/package.json
@@ -36,6 +36,7 @@
"@japa/expect-type": "^2.0.2",
"@japa/runner": "^3.1.4",
"@swc/core": "^1.5.7",
+ "@testcontainers/hivemq": "^10.9.0",
"@testcontainers/redis": "^10.9.0",
"@types/node": "^20.12.12",
"@types/object-hash": "^3.0.6",
@@ -43,8 +44,10 @@
"del-cli": "^5.1.0",
"eslint": "^8.57.0",
"ioredis": "^5.4.1",
+ "mqtt": "^5.6.1",
"prettier": "^3.2.5",
"release-it": "^17.2.1",
+ "testcontainers": "^10.9.0",
"ts-node": "^10.9.2",
"tsup": "^8.0.2",
"typescript": "^5.4.5"
diff --git a/src/transports/mqtt.ts b/src/transports/mqtt.ts
new file mode 100644
index 0000000..b6a0f62
--- /dev/null
+++ b/src/transports/mqtt.ts
@@ -0,0 +1,95 @@
+/**
+ * @boringnode/bus
+ *
+ * @license MIT
+ * @copyright Boring Node
+ */
+
+import { connect, MqttClient } from 'mqtt'
+import { assert } from '@poppinss/utils/assert'
+
+import debug from '../debug.js'
+import {
+ Transport,
+ TransportEncoder,
+ TransportMessage,
+ Serializable,
+ SubscribeHandler,
+ MqttProtocol,
+ MqttTransportConfig,
+} from '../types/main.js'
+import { JsonEncoder } from '../encoders/json_encoder.js'
+
+export function mqtt(config: MqttTransportConfig, encoder?: TransportEncoder) {
+ return () => new MqttTransport(config, encoder)
+}
+
+export class MqttTransport implements Transport {
+ #id: string | undefined
+ #client: MqttClient
+ #url: string
+ readonly #encoder: TransportEncoder
+
+ constructor(config: MqttTransportConfig, encoder?: TransportEncoder) {
+ this.#encoder = encoder ?? new JsonEncoder()
+ this.#url = `${config.protocol || MqttProtocol.MQTT}://${config.host}${config.port ? `:${config.port}` : ''}`
+
+ this.#client = connect(this.#url, config.options ?? {})
+ }
+
+ setId(id: string): Transport {
+ this.#id = id
+
+ return this
+ }
+
+ async disconnect(): Promise {
+ await this.#client.endAsync()
+ }
+
+ async publish(channel: string, message: any): Promise {
+ assert(this.#id, 'You must set an id before publishing a message')
+
+ const encoded = this.#encoder.encode({ payload: message, busId: this.#id })
+
+ await this.#client.publishAsync(channel, encoded)
+ }
+
+ async subscribe(
+ channel: string,
+ handler: SubscribeHandler
+ ): Promise {
+ this.#client.subscribe(channel, (err) => {
+ if (err) {
+ throw err
+ }
+ })
+
+ this.#client.on('message', (receivedChannel: string, message: Buffer | string) => {
+ if (channel !== receivedChannel) return
+
+ debug('received message for channel "%s"', channel)
+
+ const data = this.#encoder.decode>(message)
+
+ /**
+ * Ignore messages published by this bus instance
+ */
+ if (data.busId === this.#id) {
+ debug('ignoring message published by the same bus instance')
+ return
+ }
+
+ // @ts-expect-error - TODO: Weird typing issue
+ handler(data.payload)
+ })
+ }
+
+ onReconnect(): void {
+ this.#client.reconnect()
+ }
+
+ async unsubscribe(channel: string): Promise {
+ await this.#client.unsubscribeAsync(channel)
+ }
+}
diff --git a/src/types/main.ts b/src/types/main.ts
index ee22ecd..fb8099e 100644
--- a/src/types/main.ts
+++ b/src/types/main.ts
@@ -6,6 +6,7 @@
*/
import type { RedisOptions } from 'ioredis'
+import type { IClientOptions } from 'mqtt'
export type TransportFactory = () => Transport
/**
@@ -36,6 +37,24 @@ export interface RedisTransportConfig extends RedisOptions {
useMessageBuffer?: boolean
}
+export enum MqttProtocol {
+ MQTT = 'mqtt',
+ MQTTS = 'mqtts',
+ TCP = 'tcp',
+ TLS = 'tls',
+ WS = 'ws',
+ WSS = 'wss',
+ WXS = 'wxs',
+ ALIS = 'alis',
+}
+
+export interface MqttTransportConfig {
+ host: string
+ port?: number
+ protocol?: MqttProtocol
+ options?: IClientOptions
+}
+
export interface Transport {
setId: (id: string) => Transport
onReconnect: (callback: () => void) => void
diff --git a/tests/drivers/mqtt_transport.spec.ts b/tests/drivers/mqtt_transport.spec.ts
new file mode 100644
index 0000000..effa71e
--- /dev/null
+++ b/tests/drivers/mqtt_transport.spec.ts
@@ -0,0 +1,421 @@
+/**
+ * @boringnode/bus
+ *
+ * @license MIT
+ * @copyright Boring Node
+ */
+
+import { setTimeout } from 'node:timers/promises'
+import { test } from '@japa/runner'
+import { HiveMQContainer, StartedHiveMQContainer } from '@testcontainers/hivemq'
+import { GenericContainer, StartedTestContainer } from 'testcontainers'
+import { MqttTransport } from '../../src/transports/mqtt.js'
+import { JsonEncoder } from '../../src/encoders/json_encoder.js'
+import { TransportEncoder, TransportMessage } from '../../src/types/main.js'
+
+test.group('Mqtt Transport', (group) => {
+ let hiveMqContainer: StartedHiveMQContainer
+ let emqxContainer: StartedTestContainer
+ let mosquittoContainer: StartedTestContainer
+
+ group.setup(async () => {
+ hiveMqContainer = await new HiveMQContainer()
+ .withExposedPorts({
+ container: 1883,
+ host: 1884,
+ })
+ .start()
+ emqxContainer = await new GenericContainer('emqx/emqx').withExposedPorts(1883).start()
+ mosquittoContainer = await new GenericContainer('eclipse-mosquitto')
+ .withExposedPorts({
+ container: 1883,
+ host: 1885,
+ })
+ .withCopyFilesToContainer([
+ {
+ source: './config/mosquitto.conf',
+ target: '/mosquitto/config/mosquitto.conf',
+ },
+ ])
+ .start()
+
+ return async () => {
+ await hiveMqContainer.stop()
+ await emqxContainer.stop()
+ await mosquittoContainer.stop()
+ }
+ })
+
+ test('HiveMQ transport should not receive message emitted by itself', async ({
+ assert,
+ cleanup,
+ }) => {
+ const transport = new MqttTransport({
+ host: hiveMqContainer.getHost(),
+ port: hiveMqContainer.getPort(),
+ }).setId('bus')
+ cleanup(() => transport.disconnect())
+
+ await transport.subscribe('testing-channel', () => {
+ assert.fail('Bus should not receive message emitted by itself')
+ })
+
+ await transport.publish('testing-channel', 'test')
+ await setTimeout(1000)
+ }).disableTimeout()
+
+ test('HiveMQ transport should receive message emitted by another bus', async ({
+ assert,
+ cleanup,
+ }, done) => {
+ assert.plan(1)
+
+ const transport1 = new MqttTransport({
+ host: hiveMqContainer.getHost(),
+ port: hiveMqContainer.getPort(),
+ }).setId('bus1')
+ const transport2 = new MqttTransport({
+ host: hiveMqContainer.getHost(),
+ port: hiveMqContainer.getPort(),
+ }).setId('bus2')
+
+ cleanup(async () => {
+ await transport1.disconnect()
+ await transport2.disconnect()
+ })
+
+ await transport1.subscribe('testing-channel', (payload) => {
+ assert.equal(payload, 'test')
+ done()
+ })
+
+ await setTimeout(200)
+
+ await transport2.publish('testing-channel', 'test')
+ }).waitForDone()
+
+ test('HiveMQ message should be encoded and decoded correctly when using JSON encoder', async ({
+ assert,
+ cleanup,
+ }, done) => {
+ assert.plan(1)
+ const transport1 = new MqttTransport(
+ {
+ host: hiveMqContainer.getHost(),
+ port: hiveMqContainer.getPort(),
+ },
+ new JsonEncoder()
+ ).setId('bus1')
+ const transport2 = new MqttTransport(
+ {
+ host: hiveMqContainer.getHost(),
+ port: hiveMqContainer.getPort(),
+ },
+ new JsonEncoder()
+ ).setId('bus2')
+
+ cleanup(async () => {
+ await transport1.disconnect()
+ await transport2.disconnect()
+ })
+
+ const data = { test: 'test' }
+
+ await transport1.subscribe('testing-channel', (payload) => {
+ assert.deepEqual(payload, data)
+ done()
+ })
+
+ await setTimeout(200)
+
+ await transport2.publish('testing-channel', data)
+ }).waitForDone()
+
+ test('HiveMQ send binary data', async ({ assert, cleanup }, done) => {
+ assert.plan(1)
+
+ class BinaryEncoder implements TransportEncoder {
+ encode(message: TransportMessage) {
+ return Buffer.from(JSON.stringify(message))
+ }
+
+ decode(data: string | Buffer) {
+ const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data, 'binary')
+ return JSON.parse(buffer.toString())
+ }
+ }
+
+ const transport1 = new MqttTransport(
+ { host: hiveMqContainer.getHost(), port: hiveMqContainer.getPort() },
+ new BinaryEncoder()
+ ).setId('bus1')
+
+ const transport2 = new MqttTransport(
+ { host: hiveMqContainer.getHost(), port: hiveMqContainer.getPort() },
+ new BinaryEncoder()
+ ).setId('bus2')
+
+ cleanup(() => {
+ transport1.disconnect()
+ transport2.disconnect()
+ })
+
+ const data = ['foo', '👍']
+
+ await transport1.subscribe('testing-channel', (payload) => {
+ assert.deepEqual(payload, data)
+ done()
+ })
+
+ await setTimeout(200)
+ await transport2.publish('testing-channel', data)
+ }).waitForDone()
+
+ test('EMQX transport should not receive message emitted by itself', async ({
+ assert,
+ cleanup,
+ }) => {
+ const transport = new MqttTransport({
+ host: emqxContainer.getHost(),
+ port: emqxContainer.getMappedPort(1883),
+ }).setId('bus')
+ cleanup(() => transport.disconnect())
+
+ await transport.subscribe('testing-channel', () => {
+ assert.fail('Bus should not receive message emitted by itself')
+ })
+
+ await transport.publish('testing-channel', 'test')
+ await setTimeout(1000)
+ }).disableTimeout()
+
+ test('EMQX transport should receive message emitted by another bus', async ({
+ assert,
+ cleanup,
+ }, done) => {
+ assert.plan(1)
+
+ const transport1 = new MqttTransport({
+ host: emqxContainer.getHost(),
+ port: emqxContainer.getMappedPort(1883),
+ }).setId('bus1')
+ const transport2 = new MqttTransport({
+ host: emqxContainer.getHost(),
+ port: emqxContainer.getMappedPort(1883),
+ }).setId('bus2')
+
+ cleanup(async () => {
+ await transport1.disconnect()
+ await transport2.disconnect()
+ })
+
+ await transport1.subscribe('testing-channel', (payload) => {
+ assert.equal(payload, 'test')
+ done()
+ })
+
+ await setTimeout(200)
+
+ await transport2.publish('testing-channel', 'test')
+ }).waitForDone()
+
+ test('EMQX message should be encoded and decoded correctly when using JSON encoder', async ({
+ assert,
+ cleanup,
+ }, done) => {
+ assert.plan(1)
+ const transport1 = new MqttTransport(
+ {
+ host: emqxContainer.getHost(),
+ port: emqxContainer.getMappedPort(1883),
+ },
+ new JsonEncoder()
+ ).setId('bus1')
+ const transport2 = new MqttTransport(
+ {
+ host: emqxContainer.getHost(),
+ port: emqxContainer.getMappedPort(1883),
+ },
+ new JsonEncoder()
+ ).setId('bus2')
+ cleanup(async () => {
+ await transport1.disconnect()
+ await transport2.disconnect()
+ })
+
+ const data = { test: 'test' }
+
+ await transport1.subscribe('testing-channel', (payload) => {
+ assert.deepEqual(payload, data)
+ done()
+ })
+
+ await setTimeout(200)
+
+ await transport2.publish('testing-channel', data)
+ }).waitForDone()
+
+ test('EMQX send binary data', async ({ assert, cleanup }, done) => {
+ assert.plan(1)
+
+ class BinaryEncoder implements TransportEncoder {
+ encode(message: TransportMessage) {
+ return Buffer.from(JSON.stringify(message))
+ }
+
+ decode(data: string | Buffer) {
+ const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data, 'binary')
+ return JSON.parse(buffer.toString())
+ }
+ }
+
+ const transport1 = new MqttTransport(
+ { host: emqxContainer.getHost(), port: emqxContainer.getMappedPort(1883) },
+ new BinaryEncoder()
+ ).setId('bus1')
+
+ const transport2 = new MqttTransport(
+ { host: emqxContainer.getHost(), port: emqxContainer.getMappedPort(1883) },
+ new BinaryEncoder()
+ ).setId('bus2')
+
+ cleanup(() => {
+ transport1.disconnect()
+ transport2.disconnect()
+ })
+
+ const data = ['foo', '👍']
+
+ await transport1.subscribe('testing-channel', (payload) => {
+ assert.deepEqual(payload, data)
+ done()
+ })
+
+ await setTimeout(200)
+ await transport2.publish('testing-channel', data)
+ }).waitForDone()
+
+ test('Mosquitto transport should not receive message emitted by itself', async ({
+ assert,
+ cleanup,
+ }) => {
+ const transport = new MqttTransport({
+ host: mosquittoContainer.getHost(),
+ port: mosquittoContainer.getMappedPort(1883),
+ }).setId('bus')
+ cleanup(() => transport.disconnect())
+
+ await transport.subscribe('testing-channel', () => {
+ assert.fail('Bus should not receive message emitted by itself')
+ })
+
+ await transport.publish('testing-channel', 'test')
+ await setTimeout(1000)
+ }).disableTimeout()
+
+ test('Mosquitto transport should receive message emitted by another bus', async ({
+ assert,
+ cleanup,
+ }, done) => {
+ assert.plan(1)
+
+ const transport1 = new MqttTransport({
+ host: mosquittoContainer.getHost(),
+ port: mosquittoContainer.getMappedPort(1883),
+ }).setId('bus1')
+ const transport2 = new MqttTransport({
+ host: mosquittoContainer.getHost(),
+ port: mosquittoContainer.getMappedPort(1883),
+ }).setId('bus2')
+
+ cleanup(async () => {
+ await transport1.disconnect()
+ await transport2.disconnect()
+ })
+
+ await transport1.subscribe('testing-channel', (payload) => {
+ assert.equal(payload, 'test')
+ done()
+ })
+
+ await setTimeout(200)
+
+ await transport2.publish('testing-channel', 'test')
+ }).waitForDone()
+
+ test('Mosquitto message should be encoded and decoded correctly when using JSON encoder', async ({
+ assert,
+ cleanup,
+ }, done) => {
+ assert.plan(1)
+ const transport1 = new MqttTransport(
+ {
+ host: mosquittoContainer.getHost(),
+ port: mosquittoContainer.getMappedPort(1883),
+ },
+ new JsonEncoder()
+ ).setId('bus1')
+ const transport2 = new MqttTransport(
+ {
+ host: mosquittoContainer.getHost(),
+ port: mosquittoContainer.getMappedPort(1883),
+ },
+ new JsonEncoder()
+ ).setId('bus2')
+ cleanup(async () => {
+ await transport1.disconnect()
+ await transport2.disconnect()
+ })
+
+ const data = { test: 'test' }
+
+ await transport1.subscribe('testing-channel', (payload) => {
+ assert.deepEqual(payload, data)
+ done()
+ })
+
+ await setTimeout(200)
+
+ await transport2.publish('testing-channel', data)
+ }).waitForDone()
+
+ test('Mosquitto send binary data', async ({ assert, cleanup }, done) => {
+ assert.plan(1)
+
+ class BinaryEncoder implements TransportEncoder {
+ encode(message: TransportMessage) {
+ return Buffer.from(JSON.stringify(message))
+ }
+
+ decode(data: string | Buffer) {
+ const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data, 'binary')
+ return JSON.parse(buffer.toString())
+ }
+ }
+
+ const transport1 = new MqttTransport(
+ { host: mosquittoContainer.getHost(), port: mosquittoContainer.getMappedPort(1883) },
+ new BinaryEncoder()
+ ).setId('bus1')
+
+ const transport2 = new MqttTransport(
+ { host: mosquittoContainer.getHost(), port: mosquittoContainer.getMappedPort(1883) },
+ new BinaryEncoder()
+ ).setId('bus2')
+
+ cleanup(() => {
+ transport1.disconnect()
+ transport2.disconnect()
+ })
+
+ const data = ['foo', '👍']
+
+ await transport1.subscribe('testing-channel', (payload) => {
+ assert.deepEqual(payload, data)
+ done()
+ })
+
+ await setTimeout(200)
+ await transport2.publish('testing-channel', data)
+ }).waitForDone()
+})
diff --git a/tests/drivers/redis_transport.spec.ts b/tests/drivers/redis_transport.spec.ts
index 664fcf8..9e32f28 100644
--- a/tests/drivers/redis_transport.spec.ts
+++ b/tests/drivers/redis_transport.spec.ts
@@ -80,22 +80,32 @@ test.group('Redis Transport', (group) => {
test('message should be encoded and decoded correctly when using JSON encoder', async ({
assert,
cleanup,
- }) => {
- const transport = new RedisTransport(container.getConnectionUrl(), new JsonEncoder()).setId(
- 'bus'
+ }, done) => {
+ assert.plan(1)
+
+ const transport1 = new RedisTransport(container.getConnectionUrl(), new JsonEncoder()).setId(
+ 'bus1'
)
- cleanup(() => transport.disconnect())
+ const transport2 = new RedisTransport(container.getConnectionUrl(), new JsonEncoder()).setId(
+ 'bus2'
+ )
+
+ cleanup(async () => {
+ await transport1.disconnect()
+ await transport2.disconnect()
+ })
const data = { test: 'test' }
- await transport.subscribe('testing-channel', (payload) => {
+ await transport1.subscribe('testing-channel', (payload) => {
assert.deepEqual(payload, data)
+ done()
})
await setTimeout(200)
- await transport.publish('testing-channel', data)
- })
+ await transport2.publish('testing-channel', data)
+ }).waitForDone()
test('send binary data using useMessageBuffer', async ({ assert, cleanup }, done) => {
assert.plan(1)