Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions cloudflare-webhook-agent-ingest/src/db/node-postgres.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Pool, types } from 'pg';
import { Client, types } from 'pg';

import type { CreateDatabaseConnection, Database } from './database.js';

Expand All @@ -10,21 +10,21 @@ types.setTypeParser(types.builtins.INT8, val => parseInt(val, 10));
// types.setTypeParser(types.builtins.TIMESTAMPTZ, (val) => new Date(val))
// types.setTypeParser(types.builtins.TIMESTAMP, (val) => new Date(val))

/**
* Creates a new database connection for each invocation.
*/
export const createNodePostgresConnection: CreateDatabaseConnection = connectionString => {
const pool = new Pool({
connectionString,
max: 10,
connectionTimeoutMillis: 5000,
statement_timeout: 10 * 1000,
});

pool.on('error', error => console.error('Pool:error - Unexpected error on idle client', error));
// Helper to create and connect a new client
const createConnectedClient = async (): Promise<Client> => {
const client = new Client({ connectionString });
Comment thread
pandemicsyn marked this conversation as resolved.
await client.connect();
return client;
};

return {
__kind: 'Database',
begin: async transactionFn => {
// Pull an available client from pg-pool
const client = await pool.connect();
const client = await createConnectedClient();

try {
// Start the transaction
Expand Down Expand Up @@ -52,18 +52,22 @@ export const createNodePostgresConnection: CreateDatabaseConnection = connection
await client.query('rollback');
throw e;
} finally {
// Always release the client back to the pool!
client.release();
// Always close the client connection
await client.end();
Comment thread
pandemicsyn marked this conversation as resolved.
}
},
end: async () => {
// no-op with node-postgres since we just query from the pool
// no-op - each operation manages its own client
},
query: async (text, values = {}) => {
const result = await pool.query(text, Object.values(values));
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return result.rows ?? [];
const client = await createConnectedClient();
try {
const result = await client.query(text, Object.values(values));
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return result.rows ?? [];
} finally {
await client.end();
}
},
// casting as Database here so we don't have to manually fill in function args
} satisfies Database;
};
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,8 @@ type ResolveProfileParams = {
orgId?: string | null;
};

// Singleton instance for connection pooling
let singleton: ProfileResolutionService | null = null;
Copy link
Copy Markdown
Contributor Author

@pandemicsyn pandemicsyn Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this would really cause issues, but i definitely didn't have this singleton in my spec - i think the robot snuck this one past me at some point.


/**
* Get or create the singleton ProfileResolutionService instance.
* This ensures we reuse the database connection pool across messages.
*/
export function getProfileResolutionService(env: ProfileResolutionEnv): ProfileResolutionService {
if (!singleton) {
singleton = new ProfileResolutionService(env);
}
return singleton;
return new ProfileResolutionService(env);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,8 @@ type MintTokenResult = {
// Fixed botId for webhook tokens - used for attribution/analytics
const WEBHOOK_BOT_ID = 'webhook-bot';

// Singleton instance for connection pooling
let singleton: TokenMintingService | null = null;

/**
* Get or create the singleton TokenMintingService instance.
* This ensures we reuse the database connection pool across messages.
*/
export function getTokenMintingService(env: TokenMintingEnv): TokenMintingService {
Comment thread
pandemicsyn marked this conversation as resolved.
if (!singleton) {
singleton = new TokenMintingService(env);
}
return singleton;
return new TokenMintingService(env);
}

/**
Expand Down