Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,522 changes: 999 additions & 523 deletions package-lock.json

Large diffs are not rendered by default.

115 changes: 115 additions & 0 deletions packages/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ A NestJS module implementing the [Transactional Outbox Pattern](https://microser
- **Graceful Shutdown**: In-flight events complete before application terminates
- **Multiple ORMs**: TypeORM and MikroORM drivers included
- **Flexible Processing**: Immediate or deferred event processing per event type
- **Middleware Support**: Intercept event processing for logging, tracing, and error reporting

## How It Works

Expand Down Expand Up @@ -239,6 +240,120 @@ The `PostgreSQLEventListener`:
- Works alongside polling as a fallback mechanism
- Requires the LISTEN/NOTIFY migration from `OutboxMigrations`

## Middleware

Middlewares allow you to intercept and enhance event processing with cross-cutting concerns like logging, tracing, or error reporting.

### Creating a Middleware

```typescript
import { Injectable } from '@nestjs/common';
import { OutboxMiddleware, OutboxMiddlewareContext } from '@fullstackhouse/nestjs-outbox';

@Injectable()
export class LoggerMiddleware implements OutboxMiddleware {
name = 'logger';

async process(
context: OutboxMiddlewareContext,
next: () => Promise<void>,
): Promise<void> {
const startTime = Date.now();

console.log(`Processing event ${context.event.eventName} for listener ${context.listener.getName()}`);

try {
await next();
const duration = Date.now() - startTime;
console.log(`Completed in ${duration}ms`);
} catch (error) {
console.error(`Failed after ${Date.now() - startTime}ms`, error);
throw error;
}
}
}
```

### Registering Middlewares

```typescript
@Module({
imports: [
OutboxModule.registerAsync({
imports: [MikroOrmModule.forFeature([MikroOrmOutboxTransportEvent])],
useFactory: (orm: MikroORM, loggerMiddleware: LoggerMiddleware) => ({
driverFactory: new MikroORMDatabaseDriverFactory(orm),
events: [/* ... */],
retryEveryMilliseconds: 30_000,
maxOutboxTransportEventPerRetry: 10,
middlewares: [loggerMiddleware],
}),
inject: [MikroORM, LoggerMiddleware],
}),
],
providers: [LoggerMiddleware],
})
export class AppModule {}
```

### Middleware Context

The `OutboxMiddlewareContext` provides access to:
- `event` - The outbox transport event being processed
- `listener` - The listener handling the event
- `eventOptions` - Configuration for the event type

### Common Middleware Use Cases

**Error Reporting (Sentry)**
```typescript
@Injectable()
export class SentryMiddleware implements OutboxMiddleware {
name = 'sentry';

async process(context: OutboxMiddlewareContext, next: () => Promise<void>): Promise<void> {
try {
await next();
} catch (error) {
Sentry.captureException(error, {
tags: {
eventName: context.event.eventName,
listenerName: context.listener.getName(),
},
});
throw error;
}
}
}
```

**OpenTelemetry Tracing**
```typescript
@Injectable()
export class TracingMiddleware implements OutboxMiddleware {
name = 'tracing';

async process(context: OutboxMiddlewareContext, next: () => Promise<void>): Promise<void> {
const span = tracer.startSpan('outbox.process', {
attributes: {
'event.name': context.event.eventName,
'listener.name': context.listener.getName(),
},
});

try {
await next();
span.setStatus({ code: SpanStatusCode.OK });
} catch (error) {
span.setStatus({ code: SpanStatusCode.ERROR, message: error.message });
throw error;
} finally {
span.end();
}
}
}
```

## Graceful Shutdown

The module automatically handles graceful shutdown:
Expand Down
8 changes: 5 additions & 3 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
"author": "Full Stack House",
"license": "MIT",
"peerDependencies": {
"@nestjs/common": "^10.0.0",
"@nestjs/core": "^10.0.0"
"@nestjs/common": "^10.0.0 || ^11.0.0",
"@nestjs/core": "^10.0.0 || ^11.0.0"
},
"devDependencies": {
"@nestjs/testing": "^10.4.1",
"@nestjs/common": "^11.0.0",
"@nestjs/core": "^11.0.0",
"@nestjs/testing": "^11.0.0",
"@swc/core": "^1.7.42",
"@types/node": "^20.16.5",
"@typescript-eslint/eslint-plugin": "^8.5.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ export * from "./driver/database-driver.factory";
export * from "./driver/database.driver";
export * from "./emitter/contract/outbox-event.interface";
export { TransactionalEventEmitter, TransactionalEventEmitterOperations } from "./emitter/transactional-event-emitter";
export * from "./middleware/outbox-middleware.interface";
export * from "./outbox.module";
export * from "./outbox.module-definition";
export * from "./listener/contract/listener.interface";
export * from "./listener/discovery/on-event.decorator";
export * from "./model/outbox-transport-event.interface";
export * from "./poller/event-listener.interface";
export * from "./resolver/event-configuration-resolver.contract";

43 changes: 43 additions & 0 deletions packages/core/src/middleware/outbox-middleware.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { OutboxTransportEvent } from '../model/outbox-transport-event.interface';

export interface OutboxEventContext {
eventName: string;
eventPayload: unknown;
eventId: number;
listenerName: string;
}

export interface OutboxListenerResult {
success: boolean;
error?: Error;
durationMs: number;
}

export interface OutboxMiddleware {
beforeProcess?(context: OutboxEventContext): void | Promise<void>;
afterProcess?(context: OutboxEventContext, result: OutboxListenerResult): void | Promise<void>;
onError?(context: OutboxEventContext, error: Error): void | Promise<void>;
wrapExecution?<T>(context: OutboxEventContext, next: () => Promise<T>): Promise<T>;
}

export type OutboxMiddlewareClass = new (...args: any[]) => OutboxMiddleware;

export interface OutboxHooks {
beforeProcess?: (context: OutboxEventContext) => void | Promise<void>;
afterProcess?: (context: OutboxEventContext, result: OutboxListenerResult) => void | Promise<void>;
onError?: (context: OutboxEventContext, error: Error) => void | Promise<void>;
}

export const OUTBOX_MIDDLEWARES_TOKEN = 'OUTBOX_MIDDLEWARES_TOKEN';

export function createOutboxEventContext(
outboxTransportEvent: OutboxTransportEvent,
listenerName: string,
): OutboxEventContext {
return {
eventName: outboxTransportEvent.eventName,
eventPayload: outboxTransportEvent.eventPayload,
eventId: outboxTransportEvent.id,
listenerName,
};
}
13 changes: 12 additions & 1 deletion packages/core/src/outbox.module-definition.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ConfigurableModuleBuilder } from '@nestjs/common';
import { ConfigurableModuleBuilder, Type } from '@nestjs/common';
import { DatabaseDriverFactory } from './driver/database-driver.factory';
import { OutboxHooks, OutboxMiddleware } from './middleware/outbox-middleware.interface';

export interface OutboxModuleEventOptions {
name: string;
Expand All @@ -23,6 +24,16 @@ export interface OutboxModuleOptions {
retryEveryMilliseconds: number;
maxOutboxTransportEventPerRetry: number;
driverFactory: DatabaseDriverFactory;
/**
* Class-based middlewares for event processing hooks.
* Middlewares are instantiated via NestJS DI and can inject dependencies.
*/
middlewares?: Type<OutboxMiddleware>[];
/**
* Function-based hooks for event processing.
* Alternative to class-based middlewares for simpler use cases.
*/
hooks?: OutboxHooks;
}

export const { ConfigurableModuleClass, MODULE_OPTIONS_TOKEN, ASYNC_OPTIONS_TYPE } = new ConfigurableModuleBuilder<OutboxModuleOptions>()
Expand Down
52 changes: 46 additions & 6 deletions packages/core/src/outbox.module.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,47 @@
import { DynamicModule, Logger, Module, Provider } from '@nestjs/common';
import { DiscoveryModule } from '@nestjs/core';
import { DynamicModule, Logger, Module, Provider, Type } from '@nestjs/common';
import { DiscoveryModule, ModuleRef } from '@nestjs/core';
import { DATABASE_DRIVER_FACTORY_TOKEN, DatabaseDriverFactory } from './driver/database-driver.factory';
import { TransactionalEventEmitter } from './emitter/transactional-event-emitter';
import { EventValidator } from './event-validator/event.validator';
import { ASYNC_OPTIONS_TYPE, ConfigurableModuleClass, OutboxModuleOptions, MODULE_OPTIONS_TOKEN } from './outbox.module-definition';
import { ListenerDiscovery } from './listener/discovery/listener.discovery';
import { OutboxMiddleware, OUTBOX_MIDDLEWARES_TOKEN } from './middleware/outbox-middleware.interface';
import { EVENT_LISTENER_TOKEN } from './poller/event-listener.interface';
import { RetryableOutboxEventPoller } from './poller/retryable-outbox-event.poller';
import { OUTBOX_EVENT_PROCESSOR_TOKEN } from './processor/outbox-event-processor.contract';
import { OutboxEventProcessor } from './processor/outbox-event.processor';
import { EVENT_CONFIGURATION_RESOLVER_TOKEN } from './resolver/event-configuration-resolver.contract';
import { EventConfigurationResolver } from './resolver/event-configuration.resolver';

class HooksMiddlewareAdapter implements OutboxMiddleware {
constructor(private hooks: NonNullable<OutboxModuleOptions['hooks']>) {}

async beforeProcess(context: Parameters<NonNullable<OutboxMiddleware['beforeProcess']>>[0]) {
await this.hooks.beforeProcess?.(context);
}

async afterProcess(
context: Parameters<NonNullable<OutboxMiddleware['afterProcess']>>[0],
result: Parameters<NonNullable<OutboxMiddleware['afterProcess']>>[1],
) {
await this.hooks.afterProcess?.(context, result);
}

async onError(
context: Parameters<NonNullable<OutboxMiddleware['onError']>>[0],
error: Parameters<NonNullable<OutboxMiddleware['onError']>>[1],
) {
await this.hooks.onError?.(context, error);
}
}

@Module({
imports: [DiscoveryModule],
providers: [
Logger,
{
provide: OUTBOX_EVENT_PROCESSOR_TOKEN,
useFactory: (logger: Logger, databaseDriverFactory: DatabaseDriverFactory, eventConfigurationResolver: EventConfigurationResolver) => {
return new OutboxEventProcessor(logger, databaseDriverFactory, eventConfigurationResolver);
},
inject: [Logger, DATABASE_DRIVER_FACTORY_TOKEN, EventConfigurationResolver],
useClass: OutboxEventProcessor,
},
{
provide: EVENT_CONFIGURATION_RESOLVER_TOKEN,
Expand Down Expand Up @@ -62,6 +82,26 @@ export class OutboxModule extends ConfigurableModuleClass {
},
inject: [MODULE_OPTIONS_TOKEN],
} as Provider<any>,
{
provide: OUTBOX_MIDDLEWARES_TOKEN,
useFactory: async (options: OutboxModuleOptions, moduleRef: ModuleRef): Promise<OutboxMiddleware[]> => {
const middlewares: OutboxMiddleware[] = [];

if (options.middlewares) {
for (const MiddlewareClass of options.middlewares) {
const instance = await moduleRef.create(MiddlewareClass);
middlewares.push(instance);
}
}

if (options.hooks) {
middlewares.push(new HooksMiddlewareAdapter(options.hooks));
}

return middlewares;
},
inject: [MODULE_OPTIONS_TOKEN, ModuleRef],
} as Provider<any>,
],
exports: [TransactionalEventEmitter],
};
Expand Down
Loading
Loading