Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Currently, it supports the following transports:
<p>
👉 <strong>Memory:</strong> A simple in-memory transport for testing purposes.<br />
👉 <strong>Redis:</strong> A Redis transport for production usage.
👉 <strong>Mqtt:</strong> A Mqtt transport for production usage.
</p>

## Table of Contents
Expand All @@ -47,6 +48,7 @@ The module exposes a manager that can be used to register buses.
```typescript
import { BusManager } from '@boringnode/bus'
import { redis } from '@boringnode/bus/transports/redis'
import { mqtt } from '@boringnode/bus/transports/mqtt'
import { memory } from '@boringnode/bus/transports/memory'

const manager = new BusManager({
Expand All @@ -60,7 +62,13 @@ const manager = new BusManager({
host: 'localhost',
port: 6379,
}),
}
},
mqtt: {
transport: mqtt({
host: 'localhost',
port: 1883,
}),
},
}
})
```
Expand All @@ -79,6 +87,7 @@ By default, the bus will use the `default` transport. You can specify different

```typescript
manager.use('redis').publish('channel', 'Hello world')
manager.use('mqtt').publish('channel', 'Hello world')
```

### Without the manager
Expand Down
2 changes: 2 additions & 0 deletions config/mosquitto.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
listener 1883 0.0.0.0
allow_anonymous true
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@
"@japa/expect-type": "^2.0.2",
"@japa/runner": "^3.1.4",
"@swc/core": "^1.5.7",
"@testcontainers/hivemq": "^10.9.0",
"@testcontainers/redis": "^10.9.0",
"@types/node": "^20.12.12",
"@types/object-hash": "^3.0.6",
"c8": "^9.1.0",
"del-cli": "^5.1.0",
"eslint": "^8.57.0",
"ioredis": "^5.4.1",
"mqtt": "^5.6.1",
"prettier": "^3.2.5",
"release-it": "^17.2.1",
"testcontainers": "^10.9.0",
"ts-node": "^10.9.2",
"tsup": "^8.0.2",
"typescript": "^5.4.5"
Expand Down
95 changes: 95 additions & 0 deletions src/transports/mqtt.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* @boringnode/bus
*
* @license MIT
* @copyright Boring Node
*/

import { connect, MqttClient } from 'mqtt'
import { assert } from '@poppinss/utils/assert'

import debug from '../debug.js'
import {
Transport,
TransportEncoder,
TransportMessage,
Serializable,
SubscribeHandler,
MqttProtocol,
MqttTransportConfig,
} from '../types/main.js'
import { JsonEncoder } from '../encoders/json_encoder.js'

export function mqtt(config: MqttTransportConfig, encoder?: TransportEncoder) {
return () => new MqttTransport(config, encoder)
}

export class MqttTransport implements Transport {
#id: string | undefined
#client: MqttClient
#url: string
readonly #encoder: TransportEncoder

constructor(config: MqttTransportConfig, encoder?: TransportEncoder) {
this.#encoder = encoder ?? new JsonEncoder()
this.#url = `${config.protocol || MqttProtocol.MQTT}://${config.host}${config.port ? `:${config.port}` : ''}`

this.#client = connect(this.#url, config.options ?? {})
}

setId(id: string): Transport {
this.#id = id

return this
}

async disconnect(): Promise<void> {
await this.#client.endAsync()
}

async publish(channel: string, message: any): Promise<void> {
assert(this.#id, 'You must set an id before publishing a message')

const encoded = this.#encoder.encode({ payload: message, busId: this.#id })

await this.#client.publishAsync(channel, encoded)
}

async subscribe<T extends Serializable>(
channel: string,
handler: SubscribeHandler<T>
): Promise<void> {
this.#client.subscribe(channel, (err) => {
if (err) {
throw err
}
})

this.#client.on('message', (receivedChannel: string, message: Buffer | string) => {
if (channel !== receivedChannel) return

debug('received message for channel "%s"', channel)

const data = this.#encoder.decode<TransportMessage<T>>(message)

/**
* Ignore messages published by this bus instance
*/
if (data.busId === this.#id) {
debug('ignoring message published by the same bus instance')
return
}

// @ts-expect-error - TODO: Weird typing issue
handler(data.payload)
})
}

onReconnect(): void {
this.#client.reconnect()
}

async unsubscribe(channel: string): Promise<void> {
await this.#client.unsubscribeAsync(channel)
}
}
19 changes: 19 additions & 0 deletions src/types/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import type { RedisOptions } from 'ioredis'
import type { IClientOptions } from 'mqtt'
export type TransportFactory = () => Transport

/**
Expand Down Expand Up @@ -36,6 +37,24 @@ export interface RedisTransportConfig extends RedisOptions {
useMessageBuffer?: boolean
}

export enum MqttProtocol {
MQTT = 'mqtt',
MQTTS = 'mqtts',
TCP = 'tcp',
TLS = 'tls',
WS = 'ws',
WSS = 'wss',
WXS = 'wxs',
ALIS = 'alis',
}

export interface MqttTransportConfig {
host: string
port?: number
protocol?: MqttProtocol
options?: IClientOptions
}

export interface Transport {
setId: (id: string) => Transport
onReconnect: (callback: () => void) => void
Expand Down
Loading