diff --git a/README.md b/README.md
index dc3b49c..a65c0ab 100644
--- a/README.md
+++ b/README.md
@@ -16,11 +16,11 @@
`@rlanz/bus` is a service bus implementation for Node.js. It is designed to be simple and easy to use.
-Currently, it supports the following drivers:
+Currently, it supports the following transports:
-👉 Memory: A simple in-memory driver for testing purposes.
-👉 Redis: A Redis driver for production usage.
+👉 Memory: A simple in-memory transport for testing purposes.
+👉 Redis: A Redis transport for production usage.
## Table of Contents
@@ -46,17 +46,17 @@ The module exposes a manager that can be used to register buses.
```typescript
import { BusManager } from '@rlanz/bus'
-import { redis } from "@rlanz/bus/drivers/redis"
-import { memory } from "@rlanz/bus/drivers/memory"
+import { redis } from '@rlanz/bus/transports/redis'
+import { memory } from '@rlanz/bus/transports/memory'
const manager = new BusManager({
default: 'main',
transports: {
main: {
- driver: memory(),
+ transport: memory(),
},
redis: {
- driver: redis({
+ transport: redis({
host: 'localhost',
port: 6379,
}),
@@ -81,6 +81,26 @@ By default, the bus will use the `default` transport. You can specify different
manager.use('redis').publish('channel', 'Hello world')
```
+### Without the manager
+
+If you don't need multiple buses, you can create a single bus directly by importing the transports and the Bus class.
+
+```typescript
+import { Bus } from '@rlanz/bus'
+import { RedisTransport } from '@rlanz/bus/transports/redis'
+
+const transport = new RedisTransport({
+ host: 'localhost',
+ port: 6379,
+})
+
+const bus = new Bus(transport, {
+ retryQueue: {
+ retryInterval: '100ms'
+ }
+})
+```
+
## Retry Queue
The bus also supports a retry queue. When a message fails to be published, it will be moved to the retry queue.
@@ -92,7 +112,7 @@ const manager = new BusManager({
default: 'main',
transports: {
main: {
- driver: redis({
+ transport: redis({
host: 'localhost',
port: 6379,
}),
@@ -126,6 +146,26 @@ export interface RetryQueueOptions {
}
```
+## Test helpers
+
+The module also provides some test helpers to make it easier to test the code that relies on the bus. First, you can use the `MemoryTransport` to create a bus that uses an in-memory transport.
+
+You can also use the `ChaosTransport` to simulate a transport that fails randomly, in order to test the resilience of your code.
+
+```ts
+import { Bus } from '@rlanz/bus'
+import { ChaosTransport } from '@rlanz/bus/test_helpers'
+
+const buggyTransport = new ChaosTransport(new MemoryTransport())
+const bus = new Bus(buggyTransport)
+
+/**
+ * Now, every time you will try to publish a message, the transport
+ * will throw an error.
+ */
+buggyTransport.alwaysThrow()
+```
+
[gh-workflow-image]: https://img.shields.io/github/actions/workflow/status/romainlanz/bus/test.yml?branch=main&style=for-the-badge
[gh-workflow-url]: https://github.com/romainlanz/bus/actions/workflows/test.yml
[npm-image]: https://img.shields.io/npm/v/@rlanz/bus.svg?style=for-the-badge&logo=npm
diff --git a/drivers/memory.ts b/drivers/memory.ts
deleted file mode 100644
index 0687e81..0000000
--- a/drivers/memory.ts
+++ /dev/null
@@ -1,12 +0,0 @@
-/**
- * @rlanz/bus
- *
- * @license MIT
- * @copyright Romain Lanz
- */
-
-import { MemoryTransport } from '../src/drivers/memory_transport.js'
-
-export function memory() {
- return () => new MemoryTransport()
-}
diff --git a/drivers/redis.ts b/drivers/redis.ts
deleted file mode 100644
index 5f5dbff..0000000
--- a/drivers/redis.ts
+++ /dev/null
@@ -1,13 +0,0 @@
-/**
- * @rlanz/bus
- *
- * @license MIT
- * @copyright Romain Lanz
- */
-
-import { RedisTransport } from '../src/drivers/redis_transport.js'
-import type { RedisTransportConfig, TransportEncoder } from '../src/types/main.js'
-
-export function redis(config: RedisTransportConfig, encoder?: TransportEncoder) {
- return () => new RedisTransport(config, encoder)
-}
diff --git a/package.json b/package.json
index 8ef203d..0012634 100644
--- a/package.json
+++ b/package.json
@@ -12,7 +12,8 @@
],
"exports": {
".": "./build/index.js",
- "./drivers/*": "./build/drivers/*.js",
+ "./transports/*": "./build/src/transports/*.js",
+ "./test_helpers": "./build/src/test_helpers/index.js",
"./types/*": "./build/src/types/*.js"
},
"scripts": {
@@ -25,6 +26,14 @@
"update:toc": "npx doctoc README.md",
"test": "c8 node --loader ts-node/esm --enable-source-maps bin/test.ts"
},
+ "peerDependencies": {
+ "ioredis": "^5.0.0"
+ },
+ "peerDependenciesMeta": {
+ "ioredis": {
+ "optional": true
+ }
+ },
"dependencies": {
"@paralleldrive/cuid2": "^2.2.2",
"@poppinss/utils": "^6.7.2",
@@ -39,6 +48,7 @@
"@japa/runner": "^3.1.1",
"@swc/core": "^1.4.0",
"@testcontainers/redis": "^10.7.1",
+ "@types/node": "^20.12.5",
"@types/object-hash": "^3.0.6",
"c8": "^9.1.0",
"del-cli": "^5.1.0",
diff --git a/src/bus.ts b/src/bus.ts
index 20d5fb1..4b4c07a 100644
--- a/src/bus.ts
+++ b/src/bus.ts
@@ -12,13 +12,13 @@ import debug from './debug.js'
import type { RetryQueueOptions, Serializable, SubscribeHandler, Transport } from './types/main.js'
export class Bus {
- readonly #driver: Transport
+ readonly #transport: Transport
readonly #busId: string
readonly #errorRetryQueue: RetryQueue
readonly #retryQueueInterval: NodeJS.Timeout | undefined
- constructor(driver: Transport, options?: { retryQueue?: RetryQueueOptions }) {
- this.#driver = driver
+ constructor(transport: Transport, options?: { retryQueue?: RetryQueueOptions }) {
+ this.#transport = transport
this.#busId = createId()
this.#errorRetryQueue = new RetryQueue(options?.retryQueue)
@@ -33,7 +33,7 @@ export class Bus {
}, intervalValue)
}
- driver.setId(this.#busId).onReconnect(() => this.#onReconnect())
+ transport.setId(this.#busId).onReconnect(() => this.#onReconnect())
}
getRetryQueue() {
@@ -50,7 +50,7 @@ export class Bus {
}
async #onReconnect() {
- debug(`bus driver ${this.#driver.constructor.name} reconnected`)
+ debug(`bus transport ${this.#transport.constructor.name} reconnected`)
await this.processErrorRetryQueue()
}
@@ -58,7 +58,7 @@ export class Bus {
subscribe(channel: string, handler: SubscribeHandler) {
debug(`subscribing to channel ${channel}`)
- return this.#driver.subscribe(channel, async (message) => {
+ return this.#transport.subscribe(channel, async (message) => {
debug('received message %j from bus', message)
// @ts-expect-error - TODO: Weird typing issue
handler(message)
@@ -69,7 +69,7 @@ export class Bus {
try {
debug('publishing message "%j" to channel "%s"', message, channel)
- await this.#driver.publish(channel, message)
+ await this.#transport.publish(channel, message)
return true
} catch (error) {
@@ -92,10 +92,10 @@ export class Bus {
clearInterval(this.#retryQueueInterval)
}
- return this.#driver.disconnect()
+ return this.#transport.disconnect()
}
unsubscribe(channel: string) {
- return this.#driver.unsubscribe(channel)
+ return this.#transport.unsubscribe(channel)
}
}
diff --git a/src/bus_manager.ts b/src/bus_manager.ts
index a0b9c21..738e869 100644
--- a/src/bus_manager.ts
+++ b/src/bus_manager.ts
@@ -43,11 +43,11 @@ export class BusManager>
return cachedTransport
}
- const driverConfig = this.#transports[transportToUse]
+ const transportConfig = this.#transports[transportToUse]
debug('creating new transport instance for %s', transportToUse)
- const transportInstance = new Bus(driverConfig.driver(), {
- retryQueue: driverConfig.retryQueue,
+ const transportInstance = new Bus(transportConfig.transport(), {
+ retryQueue: transportConfig.retryQueue,
})
this.#transportsCache[transportToUse] = transportInstance
diff --git a/src/test_helpers.ts b/src/test_helpers.ts
new file mode 100644
index 0000000..5b91e2b
--- /dev/null
+++ b/src/test_helpers.ts
@@ -0,0 +1,8 @@
+/**
+ * @rlanz/bus
+ *
+ * @license MIT
+ * @copyright Romain Lanz
+ */
+
+export { ChaosTransport } from "../test_helpers/chaos_transport.js";
diff --git a/src/drivers/memory_transport.ts b/src/transports/memory_transport.ts
similarity index 96%
rename from src/drivers/memory_transport.ts
rename to src/transports/memory_transport.ts
index 36164ce..c3e2a13 100644
--- a/src/drivers/memory_transport.ts
+++ b/src/transports/memory_transport.ts
@@ -7,6 +7,10 @@
import type { Transport, Serializable, SubscribeHandler } from '../types/main.js'
+export function memory() {
+ return () => new MemoryTransport()
+}
+
export class MemoryTransport implements Transport {
#id!: string
diff --git a/src/drivers/redis_transport.ts b/src/transports/redis_transport.ts
similarity index 94%
rename from src/drivers/redis_transport.ts
rename to src/transports/redis_transport.ts
index e548e60..fbccff3 100644
--- a/src/drivers/redis_transport.ts
+++ b/src/transports/redis_transport.ts
@@ -15,8 +15,13 @@ import type {
TransportMessage,
Serializable,
SubscribeHandler,
+ RedisTransportConfig,
} from '../types/main.js'
+export function redis(config: RedisTransportConfig, encoder?: TransportEncoder) {
+ return () => new RedisTransport(config, encoder)
+}
+
export class RedisTransport implements Transport {
readonly #publisher: Redis
readonly #subscriber: Redis
diff --git a/src/types/main.ts b/src/types/main.ts
index 0ca7a62..753be74 100644
--- a/src/types/main.ts
+++ b/src/types/main.ts
@@ -23,7 +23,7 @@ export interface ManagerConfig {
test('create bus instance from the manager', ({ assert, expectTypeOf }) => {
@@ -16,7 +16,7 @@ test.group('Bus Manager', () => {
default: 'memory',
transports: {
memory: {
- driver: () => new MemoryTransport(),
+ transport: () => new MemoryTransport(),
},
},
})
@@ -32,10 +32,10 @@ test.group('Bus Manager', () => {
default: 'memory',
transports: {
memory: {
- driver: () => new MemoryTransport(),
+ transport: () => new MemoryTransport(),
},
memory1: {
- driver: () => new MemoryTransport(),
+ transport: () => new MemoryTransport(),
},
},
})
@@ -53,7 +53,7 @@ test.group('Bus Manager', () => {
default: 'memory',
transports: {
memory: {
- driver: () => new MemoryTransport(),
+ transport: () => new MemoryTransport(),
},
},
})
@@ -65,7 +65,7 @@ test.group('Bus Manager', () => {
const manager = new BusManager({
transports: {
memory: {
- driver: () => new MemoryTransport(),
+ transport: () => new MemoryTransport(),
},
},
})
@@ -81,7 +81,7 @@ test.group('Bus Manager', () => {
default: 'memory',
transports: {
memory: {
- driver: () => new MemoryTransport(),
+ transport: () => new MemoryTransport(),
retryQueue: {
enabled: false,
maxSize: 100,
@@ -104,10 +104,10 @@ test.group('Bus Manager', () => {
default: 'memory1',
transports: {
memory1: {
- driver: () => new MemoryTransport(),
+ transport: () => new MemoryTransport(),
},
memory2: {
- driver: () => new MemoryTransport(),
+ transport: () => new MemoryTransport(),
},
},
})
@@ -128,10 +128,10 @@ test.group('Bus Manager', () => {
default: 'memory1',
transports: {
memory1: {
- driver: () => new MemoryTransport(),
+ transport: () => new MemoryTransport(),
},
memory2: {
- driver: () => new MemoryTransport(),
+ transport: () => new MemoryTransport(),
},
},
})
@@ -152,10 +152,10 @@ test.group('Bus Manager', () => {
default: 'memory1',
transports: {
memory1: {
- driver: () => new MemoryTransport(),
+ transport: () => new MemoryTransport(),
},
memory2: {
- driver: () => new MemoryTransport(),
+ transport: () => new MemoryTransport(),
},
},
})
diff --git a/tests/drivers/memory_transport.spec.ts b/tests/drivers/memory_transport.spec.ts
index 047a277..27ed60e 100644
--- a/tests/drivers/memory_transport.spec.ts
+++ b/tests/drivers/memory_transport.spec.ts
@@ -7,7 +7,7 @@
import { setTimeout } from 'node:timers/promises'
import { test } from '@japa/runner'
-import { MemoryTransport } from '../../src/drivers/memory_transport.js'
+import { MemoryTransport } from '../../src/transports/memory_transport.js'
test.group('Memory Transport', () => {
test('transport should not receive message emitted by itself', async ({ assert, cleanup }) => {
diff --git a/tests/drivers/redis_transport.spec.ts b/tests/drivers/redis_transport.spec.ts
index 0de7960..73b247c 100644
--- a/tests/drivers/redis_transport.spec.ts
+++ b/tests/drivers/redis_transport.spec.ts
@@ -8,7 +8,7 @@
import { setTimeout } from 'node:timers/promises'
import { test } from '@japa/runner'
import { RedisContainer, StartedRedisContainer } from '@testcontainers/redis'
-import { RedisTransport } from '../../src/drivers/redis_transport.js'
+import { RedisTransport } from '../../src/transports/redis_transport.js'
import { JsonEncoder } from '../../src/encoders/json_encoder.js'
test.group('Redis Transport', (group) => {
diff --git a/tsconfig.json b/tsconfig.json
index ad0cc44..46e1cb1 100644
--- a/tsconfig.json
+++ b/tsconfig.json
@@ -4,4 +4,4 @@
"rootDir": "./",
"outDir": "./build"
}
-}
+}