feat: implements distributed semaphore using PostgreSQL advisory locks in api#2680
feat: implements distributed semaphore using PostgreSQL advisory locks in api#2680ygrishajev merged 1 commit intomainfrom
Conversation
📝 WalkthroughWalkthroughAdds a Postgres-backed semaphore library (PgSemaphore, MemorySemaphore, SemaphoreFactory), replaces the old in-process semaphore decorator with a pg-semaphore decorator, wires a pg-semaphore provider, updates services/tests to use the new API, and adds unit/functional tests and test setup adjustments. Changes
Sequence Diagram(s)sequenceDiagram
participant App as App Startup
participant Provider as PgSemaphore Provider
participant Config as CORE_CONFIG
participant Client as Postgres Client
participant Factory as SemaphoreFactory
App->>Provider: run APP_INITIALIZER
Provider->>Config: resolve POSTGRES_DB_URI
Config-->>Provider: DB URI
Provider->>Client: create DB client (pool)
Client-->>Provider: client instance
Provider->>Factory: SemaphoreFactory.configure(client)
Provider->>Provider: register disposal to close client
sequenceDiagram
participant Service as Billing Service Method
participant Decorator as Semaphore Decorator
participant Factory as SemaphoreFactory
participant Semaphore as PgSemaphore/MemorySemaphore
participant DB as PostgreSQL
Service->>Decorator: call method(args)
Decorator->>Factory: create(key from class/method/args)
Factory-->>Semaphore: return semaphore instance
Service->>Semaphore: withLock(fn)
alt PgSemaphore
Semaphore->>DB: begin tx + pg_advisory_xact_lock(key)
DB-->>Semaphore: lock acquired
Semaphore->>Service: execute fn(tx)
Service-->>Semaphore: fn result
Semaphore->>DB: commit tx (releases lock)
else MemorySemaphore
Semaphore->>Semaphore: acquire in-memory lock
Semaphore->>Service: execute fn()
Service-->>Semaphore: fn result
Semaphore->>Semaphore: release lock
end
Semaphore-->>Service: return result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
❌ 1 Tests Failed:
View the top 1 failed test(s) by shortest run time
To view more test analytics, go to the Test Analytics Dashboard |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In `@apps/api/src/core/lib/pg-semaphore/pg-semaphore.spec.ts`:
- Line 9: Replace the hardcoded root describe string with the class's name
property to make the suite refactor-safe: update the root test suite declaration
(the top-level describe that currently uses "PgSemaphore") to use
PgSemaphore.name instead so the suite title references the actual class symbol.
- Around line 21-105: Rename the test descriptions in the PgSemaphore spec to
use present simple (3rd person singular) and remove the leading "should" — e.g.
change "should execute function and return result" to "executes function and
returns result", "should serialize concurrent executions with the same key" to
"serializes concurrent executions with the same key", "should allow concurrent
executions with different keys" to "allows concurrent executions with different
keys", "should release lock when function throws" to "releases lock when
function throws", and "should work with default sql from static configuration"
to "works with default sql from static configuration"; update the it(...)
strings in the tests that instantiate PgSemaphore and call withLock so
descriptions reflect this convention.
In `@apps/api/src/core/lib/pg-semaphore/pg-semaphore.ts`:
- Around line 110-118: The field waitingCount in PgSemaphore is incremented on
entry to withLock() and decremented on exit, so its semantics differ from
memorySema.nrWaiting() (which counts callers blocked waiting); update the
implementation to remove ambiguity by renaming waitingCount to activeCount (or
an equivalent name) across the class (including references in withLock() and any
accessors), and update or add documentation/comments for nrWaiting() to state
that SQL mode exposes active callers rather than blocked waiters; ensure all
usages and tests referencing waitingCount or nrWaiting() are updated to the new
name/semantics to keep behavior and monitoring consistent.
In `@apps/api/src/core/lib/pg-semaphore/semaphore.decorator.ts`:
- Around line 5-23: The semaphore map currently grows unbounded because every
unique JSON args string creates a PgSemaphore stored in semaphores; fix by
adding a refcount map alongside semaphores (e.g., refCounts: Map<string,
number>), increment the count in getSemaphore whenever you return/create a
semaphore, and after the awaited semaphore.withLock(...) completes (use a
try/finally around originalMethod.apply(...)) decrement the refcount and delete
the entry from semaphores (and refCounts) when the count hits zero; update
references in descriptor.value (semaphoredFunction), getSemaphore,
PgSemaphore/PgSemaphoreImpl usage to ensure cleanup runs even on errors.
- Around line 18-20: The lock key creation in semaphoredFunction uses
JSON.stringify on args which can throw for circular refs or BigInt and cause
collisions for omitted values; update descriptor.value (semaphoredFunction) to
use a safe serializer or accept an optional caller-provided keyBuilder and fall
back safely: wrap JSON.stringify in try/catch and on error or unsupported types
produce a deterministic fallback (e.g., stable replacer, canonical stringify, or
a hash of inspected values) before calling getSemaphore with `${keyPrefix}:...`,
ensuring identical inputs map to the same key and no uncaught exceptions occur.
🧹 Nitpick comments (3)
apps/api/src/core/lib/pg-semaphore/pg-semaphore.ts (1)
29-32: Consider adding a static reset method for test isolation.Static state (
defaultSql,useMemoryLock) persists across test runs. If tests configure different modes, state can leak between tests.🧪 Optional: Add reset for testing
export class PgSemaphore { private static defaultSql: Sql | null = null; private static useMemoryLock = false; + + /** `@internal` For testing only - resets static configuration */ + static reset(): void { + PgSemaphore.defaultSql = null; + PgSemaphore.useMemoryLock = false; + }apps/api/src/core/lib/pg-semaphore/semaphore.decorator.spec.ts (2)
4-4: Use function reference for describe suite description.As per coding guidelines, use
createSemaphoreDecorator.nameinstead of a hardcoded string to enable automated refactoring tools to find all references.♻️ Suggested fix
-describe("Semaphore decorator", () => { +describe(createSemaphoreDecorator.name, () => {
5-69: Use present simple tense without "should" prefix in test descriptions.As per coding guidelines, test descriptions should use present simple, 3rd person singular without prepending "should".
♻️ Suggested refactoring for test descriptions
- it("should call original method and return result", async () => { + it("calls original method and returns result", async () => {- it("should create semaphore with key including class name, method name and args", async () => { + it("creates semaphore with key including class name, method name and args", async () => {- it("should use withLock to execute the method", async () => { + it("uses withLock to execute the method", async () => {- it("should reuse semaphore for same arguments", async () => { + it("reuses semaphore for same arguments", async () => {- it("should create different semaphores for different arguments", async () => { + it("creates different semaphores for different arguments", async () => {- it("should handle multiple arguments in key", async () => { + it("handles multiple arguments in key", async () => {- it("should propagate errors from original method", async () => { + it("propagates errors from original method", async () => {- it("should preserve this context", async () => { + it("preserves this context", async () => {
6a469f0 to
80777bd
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
apps/api/src/user/repositories/user/user.repository.ts (1)
69-80:⚠️ Potential issue | 🟠 MajorBug:
lastUserAgenttruncation not applied on conflict update.The
withoutUsernameobject on line 76 bypassestoInput(), which truncateslastUserAgenttouserAgentMaxLength. When an upsert triggers theON CONFLICT DO UPDATEpath,lastUserAgentwon't be truncated, potentially causing database errors or data integrity issues.🐛 Proposed fix to apply `toInput()` consistently
async upsertOnExternalIdConflict(data: UserInput): Promise<UserOutput> { - const { username, ...withoutUsername } = data; + const { username: _username, ...withoutUsername } = data; const [item] = await this.cursor .insert(this.table) .values(this.toInput(data)) .onConflictDoUpdate({ target: [this.table.userId], - set: withoutUsername + set: this.toInput(withoutUsername) }) .returning(); return this.toOutput(item); }Note: Also prefixed
usernamewith_to indicate intentional non-use and avoid potential linter warnings.apps/api/src/billing/services/refill/refill.service.spec.ts (1)
94-125:⚠️ Potential issue | 🟡 MinorAlign
setuphelpers with test conventions.There are two
setuphelpers inside nested describes and they lack the required single inline-typed parameter. Please consolidate to a single root-levelsetup(bottom of the rootdescribe) and add a single inline-typed param. Example signature tweak below (apply to the shared helper).♻️ Example adjustment (apply to the shared root-level setup)
- function setup() { - PgSemaphore.configure({ useMemoryLock: true }); + function setup({ useMemoryLock = true }: { useMemoryLock?: boolean } = {}) { + if (useMemoryLock) { + PgSemaphore.configure({ useMemoryLock: true }); + } const billingConfig = mock<BillingConfig>(); const userWalletRepository = mock<UserWalletRepository>(); const managedUserWalletService = mock<ManagedUserWalletService>(); const managedSignerService = mock<ManagedSignerService>(); const balancesService = mock<BalancesService>(); const walletInitializerService = mock<WalletInitializerService>(); const analyticsService = mock<AnalyticsService>();As per coding guidelines: Use
setupfunction instead ofbeforeEachin unit and service level tests. Thesetupfunction must be at the bottom of the rootdescribeblock, should create an object under test and return it, accept a single parameter with inline type definition, avoid shared state, and not have a specified return type.Also applies to: 198-229
🤖 Fix all issues with AI agents
In `@apps/api/src/core/lib/pg-semaphore/pg-semaphore.ts`:
- Around line 55-63: The configure overload in PgSemaphore leaves
PgSemaphore.useMemoryLock set to true forever when previously enabled; change
PgSemaphore.configure so that when called with a Sql config (i.e., the branch
where you assign PgSemaphore.defaultSql) you also explicitly clear/reset the
memory-mode flag (set PgSemaphore.useMemoryLock = false) so subsequent
configure(sql) calls use DB-backed locking; update PgSemaphore.configure to set
defaultSql = config as Sql and reset useMemoryLock when "useMemoryLock" is not
present or false.
80777bd to
3fb461f
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@apps/api/src/core/lib/pg-semaphore/pg-semaphore.spec.ts`:
- Around line 9-11: The test instantiates the DB client using
process.env.POSTGRES_DB_URI! which will throw an opaque exception if the env var
is missing; update the pg-semaphore.spec.ts test setup (where PgSemaphore.name
and the const db are defined) to guard that environment variable exists and fail
with a clear message or skip the tests: check process.env.POSTGRES_DB_URI before
calling postgres(), and either throw a descriptive Error like "POSTGRES_DB_URI
not set for pg-semaphore tests" or call the test framework's skip/describe.skip
to avoid running when the variable is absent.
In `@apps/api/src/core/lib/pg-semaphore/semaphore.decorator.spec.ts`:
- Around line 71-105: The setup function currently has no parameters—update its
signature to accept a single parameter with an inline type definition (e.g.
setup(opts: { /* optional overrides */ } = {})) and remove any explicit return
type; keep building MockPgSemaphore, mockWithLock,
createSemaphoreDecorator(PgSemaphore), TestClass and instance inside it and
return the object as before so tests can pass optional overrides via that single
inline-typed parameter. Ensure setup remains at the bottom of the root describe
block and does not rely on shared state.
- Line 4: The root test suite is using a hardcoded string ("Semaphore
decorator") which breaks automated refactors; change the root describe to use
the subject's .name property instead (replace describe("Semaphore decorator",
...) with describe(SemaphoreDecorator.name, ...)) so the test suite name is tied
directly to the SemaphoreDecorator symbol and stays correct during refactors.
🧹 Nitpick comments (1)
apps/api/src/billing/lib/batch-signing-client/batch-signing-client.service.ts (1)
78-82: Consider reusinggetAddress()insidegetSemaphore()to avoid duplicate wallet calls.This keeps wallet access memoized in one place.
♻️ Suggested refactor
private readonly getSemaphore = memoizeAsync(async () => { - const address = await this.wallet.getFirstAddress(); + const address = await this.getAddress(); this.semaphore = new PgSemaphore(`BatchSigningClientService:${address}`); return this.semaphore; });
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In
`@apps/api/src/billing/lib/batch-signing-client/batch-signing-client.service.spec.ts`:
- Around line 222-223: The current setup function signature uses a positional
parameter array (setup(testData: TransactionTestData[])); change it to accept a
single inline-typed parameter object (e.g. setup(opts: { testData:
TransactionTestData[] }) with no explicit return type), update all call sites to
pass an object (setup({ testData: [...] })), keep the PgSemaphore.configure({
useMemoryLock: true }) call inside setup, ensure setup is placed at the bottom
of the root describe block, and have setup construct and return the
system-under-test (no shared state) instead of using beforeEach.
In `@apps/api/src/core/lib/pg-semaphore/pg-semaphore.ts`:
- Around line 32-38: The issue is that in memory mode multiple PgSemaphore
instances for the same lockKey can be created concurrently (lockKey, memorySema)
because the current caching check is racy; fix by introducing a static cache and
an atomic initializer: add a static Map<number, Sema> (e.g.,
PgSemaphore.memorySemaMap) and a static Map<number, Promise<Sema>> (e.g.,
PgSemaphore.memorySemaInitMap) and replace direct construction in the
PgSemaphore constructor with a static async factory method (e.g.,
PgSemaphore.create(...)) that calls a new private static async
getOrCreateMemorySema(lockKey) which (1) checks memorySemaMap and returns the
Sema if present, (2) if absent checks memorySemaInitMap for an in-flight Promise
and awaits it, (3) if none, creates a Promise that constructs the Sema, stores
it in memorySemaInitMap while creating, then stores the created Sema into
memorySemaMap and deletes the init entry before resolving; update usages to call
PgSemaphore.create(...) and set this.memorySema from the awaited result so only
one Sema per lockKey is ever created atomically.
In `@apps/api/src/core/lib/pg-semaphore/semaphore.decorator.spec.ts`:
- Line 4: Replace the hardcoded root describe string with a dynamic reference by
using createSemaphoreDecorator.name; locate the test's top-level
describe("Semaphore decorator", ...) and change it to
describe(createSemaphoreDecorator.name, ...) so automated refactors can find the
test by the function's real name.
- Around line 71-78: The setup helper currently uses jest.fn() mocks and no
parameter; change setup to accept a single inline-typed parameter (e.g.,
options: { key?: string } or the exact typed param required) and replace
jest.fn() usages with mocks from jest-mock-extended: create a mockWithLock using
mock<Function>() or mock<ReturnType> and create MockPgSemaphore using
mock<typeof PgSemaphore>() from jest-mock-extended, then set
MockPgSemaphore.withLock = mockWithLock (or configure the mock's implementation)
and pass MockPgSemaphore into createSemaphoreDecorator so tests use the typed
jest-mock-extended mocks instead of jest.fn(); reference symbols: setup,
mockWithLock, MockPgSemaphore, PgSemaphore, createSemaphoreDecorator.
🧹 Nitpick comments (1)
apps/api/src/billing/lib/batch-signing-client/batch-signing-client.service.ts (1)
78-80: Reuse the memoized address for semaphore initialization.Using
getAddress()avoids an extra wallet call and keeps memoization in one place.♻️ Suggested refactor
- const address = await this.wallet.getFirstAddress(); + const address = await this.getAddress();
apps/api/src/billing/lib/batch-signing-client/batch-signing-client.service.spec.ts
Outdated
Show resolved
Hide resolved
9ae2738 to
5cf6acc
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@apps/api/src/user/repositories/user/user.repository.ts`:
- Around line 69-76: The upsertOnExternalIdConflict implementation inserts
normalized input with this.toInput(data) but uses raw withoutUsername for the
onConflict update, skipping normalization; change to compute a single
normalizedInput = this.toInput(data), then destructure username out of
normalizedInput (e.g., const { username, ...withoutUsername } = normalizedInput)
and use that normalized withoutUsername in the onConflictDoUpdate.set so both
insert and update reuse the same normalized values (preserving lastUserAgent
truncation and other normalizations) while keeping the insert using
normalizedInput.
🧹 Nitpick comments (1)
apps/api/src/core/providers/pg-semaphore.provider.ts (1)
20-22: EmptyON_APP_STARThook is unnecessary.The hook does nothing. If no startup action is needed, consider omitting it or adding a comment explaining why it's required (e.g., if the provider registration pattern mandates it).
5cf6acc to
4661dd5
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/api/src/billing/services/refill/refill.service.spec.ts (1)
94-126:⚠️ Potential issue | 🟡 MinorConsolidate setup helper and match required signature.
Line 94 and Line 198 define duplicated
setup()helpers without a parameter and not at the bottom of the rootdescribe. Please collapse into a single root-levelsetupthat accepts one inline-typed parameter (even if unused).♻️ Proposed refactor
@@ - function setup() { - SemaphoreFactory.useMemory(); - const billingConfig = mock<BillingConfig>(); - const userWalletRepository = mock<UserWalletRepository>(); - const managedUserWalletService = mock<ManagedUserWalletService>(); - const managedSignerService = mock<ManagedSignerService>(); - const balancesService = mock<BalancesService>(); - const walletInitializerService = mock<WalletInitializerService>(); - const analyticsService = mock<AnalyticsService>(); - - billingConfig.FEE_ALLOWANCE_REFILL_AMOUNT = 1000; - - const service = new RefillService( - billingConfig, - userWalletRepository, - managedUserWalletService, - managedSignerService, - balancesService, - walletInitializerService, - analyticsService - ); - - return { - service, - billingConfig, - userWalletRepository, - managedUserWalletService, - managedSignerService, - balancesService, - walletInitializerService, - analyticsService - }; - } }); @@ - function setup() { - SemaphoreFactory.useMemory(); - const billingConfig = mock<BillingConfig>(); - const userWalletRepository = mock<UserWalletRepository>(); - const managedUserWalletService = mock<ManagedUserWalletService>(); - const managedSignerService = mock<ManagedSignerService>(); - const balancesService = mock<BalancesService>(); - const walletInitializerService = mock<WalletInitializerService>(); - const analyticsService = mock<AnalyticsService>(); - - billingConfig.FEE_ALLOWANCE_REFILL_AMOUNT = 1000; - - const service = new RefillService( - billingConfig, - userWalletRepository, - managedUserWalletService, - managedSignerService, - balancesService, - walletInitializerService, - analyticsService - ); - - return { - service, - billingConfig, - userWalletRepository, - managedUserWalletService, - managedSignerService, - balancesService, - walletInitializerService, - analyticsService - }; - } }); + + function setup({}: {} = {}) { + SemaphoreFactory.useMemory(); + const billingConfig = mock<BillingConfig>(); + const userWalletRepository = mock<UserWalletRepository>(); + const managedUserWalletService = mock<ManagedUserWalletService>(); + const managedSignerService = mock<ManagedSignerService>(); + const balancesService = mock<BalancesService>(); + const walletInitializerService = mock<WalletInitializerService>(); + const analyticsService = mock<AnalyticsService>(); + + billingConfig.FEE_ALLOWANCE_REFILL_AMOUNT = 1000; + + const service = new RefillService( + billingConfig, + userWalletRepository, + managedUserWalletService, + managedSignerService, + balancesService, + walletInitializerService, + analyticsService + ); + + return { + service, + billingConfig, + userWalletRepository, + managedUserWalletService, + managedSignerService, + balancesService, + walletInitializerService, + analyticsService + }; + } });As per coding guidelines, "Use
setupfunction instead ofbeforeEachin test files. Thesetupfunction must be at the bottom of the rootdescribeblock, should create an object under test and return it, accept a single parameter with inline type definition, avoid shared state, and not have a specified return type."Also applies to: 198-229
🧹 Nitpick comments (1)
apps/api/src/billing/lib/batch-signing-client/batch-signing-client.service.ts (1)
71-83: ReusegetAddress()to avoid duplicate wallet calls.
You already memoizegetAddress; reusing it here keeps all address caching in one place.♻️ Proposed change
private readonly getSemaphore = memoizeAsync(async () => { - const address = await this.wallet.getFirstAddress(); + const address = await this.getAddress(); this.semaphore = SemaphoreFactory.create(`BatchSigningClientService:${address}`); return this.semaphore; });
4661dd5 to
0f2adcd
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@apps/api/src/core/lib/pg-semaphore/pg-semaphore.ts`:
- Around line 150-155: PgSemaphore.nrInFlight() and MemorySemaphore.nrInFlight()
currently return different semantics (PgSemaphore returns waitingCount which
includes the current holder; MemorySemaphore returns sema.nrWaiting() which only
counts blocked waiters). Make them consistent: choose the intended meaning
(either "in-flight = holder + waiters" or "in-flight = waiters only"), update
PgSemaphore.waitingCount (rename to inFlightCount if adopting holder+waiters) or
change PgSemaphore.nrInFlight() to return waitingCount - 1 (if adopting
waiters-only), and update any callers like hasPendingTransactions to use the
clarified semantic; ensure both PgSemaphore.nrInFlight() and
MemorySemaphore.nrInFlight() compute the same metric and add a short comment
documenting the chosen definition.
In `@apps/api/test/setup-functional-tests.ts`:
- Around line 14-19: The module currently calls container.resolve(CORE_CONFIG)
and SemaphoreFactory.configure(...) at import time which causes side effects;
move this logic into test lifecycle hooks by declaring a module-scoped let
semaphoreClient variable, then initialize semaphoreClient =
postgres(container.resolve(CORE_CONFIG).POSTGRES_DB_URI, {...}) and call
SemaphoreFactory.configure(semaphoreClient) inside a beforeAll() hook, and clean
up in afterAll() by disposing container, awaiting semaphoreClient.end(), calling
dbService.teardown() and cacheEngine.clearAllKeyInCache(); ensure
beforeAll/afterAll are async and handle potential disposal errors.
0f2adcd to
c849011
Compare
c849011 to
7cd6bf8
Compare
Summary by CodeRabbit
New Features
Refactor
Tests
Behavior Change