fix: added auth to register holder notification api#1542
fix: added auth to register holder notification api#1542RinkalBhojani merged 13 commits intomainfrom
Conversation
📝 WalkthroughWalkthroughAdds JetStream/NATS integration (service, module, consumer, subscriber, pending-ack store, interfaces), a ClientAccessGuard applied to a notification endpoint, new env vars for NATS/Firebase, firebase-admin dependency, and minor logging/import adjustments. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant App as Notification App
participant NatsSvc as NatsService
participant JSM as JetStreamManager
participant JSCons as JetStreamConsumer
participant Pending as PendingAckStore
participant Sub as NotificationNATSSubsciber
participant DID as Holder DID / User Subject
rect rgba(200,200,255,0.5)
App->>NatsSvc: onModuleInit -> connect()
NatsSvc->>JSM: jetstreamManager()
JSM-->>NatsSvc: stream/consumer info
JSM->>JSM: ensureStream()/ensureDidStream()/ensureConsumer()
end
rect rgba(200,255,200,0.5)
JSCons->>JSM: pull/fetch messages
JSCons->>JSCons: parse subject → sessionId, event, did
alt deliveryCount > threshold
JSCons->>DID: publish to DID_STREAM / send FCM
JSCons-->>JSM: ack original message
else
JSCons->>Pending: save pending ack -> ackKey
JSCons->>NatsSvc: publish to holder DID subject (payload + ackKey)
NatsSvc->>DID: deliver via NATS to holder DID/user subject
Note right of Pending: await user ack -> Sub
Sub->>Pending: handleUserAck(ackKey) -> ack original JsMsg via JetStream
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Important Action Needed: IP Allowlist UpdateIf your organization protects your Git platform with IP whitelisting, please add the new CodeRabbit IP address to your allowlist:
Reviews will stop working after February 8, 2026 if the new IP is not added to your allowlist. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @apps/api-gateway/src/authz/guards/client-access-guard.ts:
- Around line 9-13: The guard currently assumes request.user exists and calls
user.hasOwnProperty('client_id'), which can throw if user is null/undefined or
shadows hasOwnProperty; update the check to first ensure request.user is
non-null (e.g., if (!request.user) throw new UnauthorizedException(...)) and
then verify the client_id key using a safe call such as
Object.prototype.hasOwnProperty.call(request.user, 'client_id'); keep the
existing UnauthorizedException when the check fails and reference the same
request, user and UnauthorizedException symbols.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
apps/api-gateway/src/authz/guards/client-access-guard.tsapps/api-gateway/src/notification/notification.controller.tsapps/notification/src/notification.service.ts
🧰 Additional context used
🧬 Code graph analysis (1)
apps/api-gateway/src/notification/notification.controller.ts (2)
apps/api-gateway/src/agent/agent.controller.ts (1)
ApiBearerAuth(43-251)apps/api-gateway/src/credential-definition/credential-definition.controller.ts (1)
ApiBearerAuth(42-197)
🔇 Additional comments (3)
apps/notification/src/notification.service.ts (1)
7-12: LGTM!The import additions for
HolderNotificationRepository,NotificationStatus, and the updated path for holder-notification interfaces are correctly structured and align with the existing codebase patterns.apps/api-gateway/src/notification/notification.controller.ts (2)
2-25: LGTM!The imports for
UseGuards,ApiBearerAuth,AuthGuard, andClientAccessGuardare correctly added and follow the established patterns in other controllers likeCredentialDefinitionControllerandAgentController.
97-105: LGTM! Guard ordering is correct.The authentication setup follows the correct order:
AuthGuard('jwt')runs first to authenticate and populaterequest.user, thenClientAccessGuardvalidates the presence ofclient_id. The@ApiBearerAuth()decorator properly documents the bearer token requirement in Swagger.One minor observation: the
@ApiExcludeEndpoint()decorator is commented out (Line 98), meaning this endpoint will now be visible in Swagger documentation. Confirm this is intentional as part of exposing this API.
There was a problem hiding this comment.
Actionable comments posted: 10
🤖 Fix all issues with AI agents
In `@apps/notification/src/nats/jetstream.consumer.ts`:
- Around line 47-48: The current handler calls msg.json() and logs the full
payload via this.logger.log, which may leak sensitive data; change the logging
to avoid raw payloads by either (a) logging only safe metadata derived from the
message (e.g., Object.keys(payload), payload size, message.headers) instead of
the full payload, or (b) moving the full payload log behind a debug flag using
this.logger.debug so it only appears when debugging is enabled. Update the code
that uses msg.json() and this.logger.log in the JetStream consumer message
handler to extract and log only non-sensitive metadata (or wrap full payload
logging with a debug check) and ensure no raw payload is written to standard
logs.
- Around line 50-51: The log message after publishing in jetstream.consumer.ts
is inconsistent with the subject in this.nats.publish (publishing to 'did-key'
but logging 'did:key:...'); update either the publish subject used in the call
to this.nats.publish or change the string passed to this.logger.log so both
reflect the exact same NATS subject (reference the this.nats.publish call and
the this.logger.log invocation) to avoid confusion during triage.
- Around line 13-16: The bootstrap currently calls this.nats.jetstream() inside
onApplicationBootstrap without ensuring a connection; before calling
this.nats.jetstream() await or verify the NATS connection (e.g., await
this.nats.connect() or check connection-ready) so jetstream() cannot throw;
update onApplicationBootstrap to wait for the connection, then call
this.nats.jetstream() and proceed to js.consumers.get(STREAM, CONSUMER).
In `@apps/notification/src/nats/nats-subscriber.ts`:
- Line 6: Rename the class TestSubsciber to TestSubscriber (the class implements
OnApplicationBootstrap) to fix the typo; update the class declaration/export and
any internal references or imports/usages across the codebase (tests, DI tokens,
module providers, and export statements) to use the new TestSubscriber
identifier so compilation and dependency injection keep working.
- Around line 23-24: The code is awaiting a non-Promise property (this.nats.nc);
change the logic in NatsSubscriber to obtain a ready NatsConnection safely:
remove the unnecessary await, check if this.nats.nc is defined/ready and if not
call a connection method on NatsService (e.g., ensureConnected(), connect(), or
a new getConnection() that establishes the connection) and then use the returned
NatsConnection to call nc.subscribe(subject). Update the code paths that
reference this.nats.nc (the subscription creation) to use the
guaranteed/returned connection instance.
In `@apps/notification/src/nats/nats.module.ts`:
- Line 5: The import and exported identifier are misspelled: change every
occurrence of TestSubsciber to TestSubscriber; update the import in the module
(where TestSubsciber is imported) and the declaration/export inside
nats-subscriber.ts so both files use the corrected class/name TestSubscriber
consistently.
- Around line 7-15: The NatsModule injects Logger in its constructor but Logger
is not listed in the module providers, causing DI failure; update the `@Module`
providers array (currently providers: [NatsService, JetStreamConsumer,
TestSubsciber]) to include Logger so Nest can resolve it (i.e., providers:
[NatsService, JetStreamConsumer, TestSubsciber, Logger]) or alternatively remove
constructor injection and instantiate a Logger inside NatsModule (e.g., new
Logger(NatsModule.name))—modify the module to reference the Logger symbol
accordingly.
In `@apps/notification/src/nats/nats.service.ts`:
- Around line 19-22: Validate that process.env.NATS_HOST, NATS_PORT, NATS_USER,
and NATS_PASSWORD are present before calling connect in the NatsService
initialization: if any are missing, log a clear error and throw/exit instead of
passing literal "undefined" into usernamePasswordAuthenticator. Move creation of
the servers array and the call to usernamePasswordAuthenticator to after this
check and use the validated values when calling connect (referencing the this.nc
assignment, connect, and usernamePasswordAuthenticator).
- Around line 12-15: The connect() method currently returns undefined when
this.connected is true; update it to return the existing NatsConnection instead
of nothing. In the connect() function (and any early-return branch guarded by
the this.connected flag), return the stored connection instance (e.g., this.nc
or Promise.resolve(this.nc) as appropriate) so the method always fulfills its
declared Promise<NatsConnection> contract; ensure the stored field referenced
(this.nc) is non-nullable or asserted before returning.
In `@apps/oid4vc-verification/src/oid4vc-verification.service.ts`:
- Around line 381-383: The error log in the this.logger.error call incorrectly
references [createVerificationPresentation]; update the log string to reference
the actual function name createIntentBasedVerificationPresentation so traces
match the function where the error occurred (locate the this.logger.error
invocation inside createIntentBasedVerificationPresentation and replace the
bracketed function name accordingly, preserving the JSON.stringify(error)
detail).
🧹 Nitpick comments (4)
apps/notification/src/nats/nats-subscriber.ts (1)
12-16: Placeholder implementation with commented-out code.The
onApplicationBootstrapis empty with commented code. If this is test/development code, consider either completing the implementation or removing the class until it's needed.apps/notification/src/nats/jetstream.setup.ts (3)
47-55: ESLint camelcase errors are false positives - these are NATS JetStream API property names.The snake_case identifiers (
durable_name,ack_policy,deliver_policy,ack_wait,max_deliver) are required by the NATS JetStream API. Add an eslint-disable comment to suppress these warnings.Proposed fix
logger.log(`[NATS] Creating consumer '${CONSUMER}'`); + // eslint-disable-next-line camelcase await jsm.consumers.add(STREAM, { name: CONSUMER, durable_name: CONSUMER, ack_policy: AckPolicy.Explicit, deliver_policy: DeliverPolicy.All, ack_wait: nanos(30_000), max_deliver: 5 });Or add a block disable comment above the object:
/* eslint-disable camelcase */ await jsm.consumers.add(STREAM, { name: CONSUMER, durable_name: CONSUMER, ack_policy: AckPolicy.Explicit, deliver_policy: DeliverPolicy.All, ack_wait: nanos(30_000), max_deliver: 5 }); /* eslint-enable camelcase */
15-15: Remove commented-out debug code.Lines 15, 22, 38-42, and 54 contain commented-out code that appears to be development/debugging artifacts. Clean these up before merging.
25-28: Consider making stream source domains configurable.The domains
'del'and'blr'are hardcoded. If these are environment-specific (e.g., data center regions), consider moving them to environment variables for flexibility across deployments.
| async onApplicationBootstrap(): Promise<void> { | ||
| const js = this.nats.jetstream(); // ✅ now safe | ||
|
|
||
| const consumer: Consumer = await js.consumers.get(STREAM, CONSUMER); |
There was a problem hiding this comment.
Ensure NATS connection is established before calling jetstream().
If connect() hasn’t run yet, jetstream() throws and bootstrap will fail. Safer to await a connection here (or guarantee module init ordering).
🔧 Suggested fix
async onApplicationBootstrap(): Promise<void> {
- const js = this.nats.jetstream(); // ✅ now safe
+ await this.nats.connect();
+ const js = this.nats.jetstream();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async onApplicationBootstrap(): Promise<void> { | |
| const js = this.nats.jetstream(); // ✅ now safe | |
| const consumer: Consumer = await js.consumers.get(STREAM, CONSUMER); | |
| async onApplicationBootstrap(): Promise<void> { | |
| await this.nats.connect(); | |
| const js = this.nats.jetstream(); | |
| const consumer: Consumer = await js.consumers.get(STREAM, CONSUMER); |
🤖 Prompt for AI Agents
In `@apps/notification/src/nats/jetstream.consumer.ts` around lines 13 - 16, The
bootstrap currently calls this.nats.jetstream() inside onApplicationBootstrap
without ensuring a connection; before calling this.nats.jetstream() await or
verify the NATS connection (e.g., await this.nats.connect() or check
connection-ready) so jetstream() cannot throw; update onApplicationBootstrap to
wait for the connection, then call this.nats.jetstream() and proceed to
js.consumers.get(STREAM, CONSUMER).
|
|
||
| const sd = new TextDecoder(); | ||
| @Injectable() | ||
| export class TestSubsciber implements OnApplicationBootstrap { |
There was a problem hiding this comment.
Typo in class name: TestSubsciber → TestSubscriber.
Proposed fix
-export class TestSubsciber implements OnApplicationBootstrap {
+export class TestSubscriber implements OnApplicationBootstrap {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export class TestSubsciber implements OnApplicationBootstrap { | |
| export class TestSubscriber implements OnApplicationBootstrap { |
🤖 Prompt for AI Agents
In `@apps/notification/src/nats/nats-subscriber.ts` at line 6, Rename the class
TestSubsciber to TestSubscriber (the class implements OnApplicationBootstrap) to
fix the typo; update the class declaration/export and any internal references or
imports/usages across the codebase (tests, DI tokens, module providers, and
export statements) to use the new TestSubscriber identifier so compilation and
dependency injection keep working.
| import { NatsService } from './nats.service'; | ||
| import { JetStreamConsumer } from './jetstream.consumer'; | ||
| import { ensureConsumer, ensureStream } from './jetstream.setup'; | ||
| import { TestSubsciber } from './nats-subscriber'; |
There was a problem hiding this comment.
Typo in import: TestSubsciber → TestSubscriber.
This matches the typo in nats-subscriber.ts. Fix both together.
🤖 Prompt for AI Agents
In `@apps/notification/src/nats/nats.module.ts` at line 5, The import and exported
identifier are misspelled: change every occurrence of TestSubsciber to
TestSubscriber; update the import in the module (where TestSubsciber is
imported) and the declaration/export inside nats-subscriber.ts so both files use
the corrected class/name TestSubscriber consistently.
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Fix all issues with AI agents
In `@apps/notification/src/nats/jetstream.setup.ts`:
- Around line 67-70: Fix the typo and make the catch log accurately reflect the
operation: in the catch block that attempts to fetch consumer info (refer to
consumerName and the logger.error call in jetstream.setup.ts), change the
message from "Error creatingg consumer info" to something like "Error fetching
consumer info" or "Consumer does not exist" (as appropriate), remove the extra
'g', and include the actual error details (error.message or the error object) to
make the log informative.
- Around line 78-80: The inactive_threshold value is currently a raw number
(inactive_threshold: 30_000) which is interpreted as nanoseconds incorrectly;
update the JetStream consumer options by wrapping the value with the nanos()
helper (use inactive_threshold: nanos(30_000)) so it is passed as a proper
duration in nanoseconds to match ack_wait, locating and modifying the
inactive_threshold property in the consumer/stream setup code (search for
inactive_threshold in jetstream.setup or the consumer configuration).
In `@apps/notification/src/nats/nats-subscriber.ts`:
- Around line 106-108: The consumer is being deleted immediately after fetching
messages which can cause message loss because messages in pendingAckStore are
not yet acknowledged by handleUserAck; instead, modify the logic around
nats.jetstreamManager().consumers.delete(STREAM, consumerName) so you only
delete the consumer after pendingAckStore for sessionId/consumerName is empty
(or after a bounded wait/timeout) or switch to a lifecycle strategy like setting
inactive_threshold on the consumer when creating it; reference pendingAckStore,
handleUserAck, consumerName, STREAM and the call to
nats.jetstreamManager().consumers.delete to implement one of these approaches
(wait-for-acks-with-timeout or auto-cleanup via inactive_threshold) to ensure
acks succeed before deletion.
- Around line 17-27: In onApplicationBootstrap, await the asynchronous
subscribe(...) calls so subscriptions are established before the bootstrap
completes: change the two subscribe invocations inside onApplicationBootstrap to
await this.subscribe(...) and keep their async callbacks as-is; also fix the log
message typos by replacing "Subsciber" with "Subscriber" in the logger.log calls
for the 'active.user' and 'user.ack' handlers to ensure correct spelling and
consistent logs (references: onApplicationBootstrap, this.subscribe,
activateUserConsumers, handleUserAck, this.logger.log).
- Line 96: The code saves pending acks using the wrong stream name literal
'notify' which mismatches the actual JetStream stream 'aggregate', causing
retrieval failures; update the call to pendingAckStore.save(...) in
nats-subscriber.ts to use the correct stream identifier (the same STREAM used in
jetstream.setup, i.e., 'aggregate' or the shared constant if available) so
ackKey is stored under the correct stream; reference pendingAckStore.save,
ackKey, consumerName, and msg when making the change and prefer using the
exported STREAM constant instead of a hard-coded string.
- Line 94: Destructuring assigns a local sessionId that shadows the function
parameter sessionId; rename the destructured identifier (e.g., subjectSessionId
or parsedSessionId) in the destructuring expression (the line with const [_,
domain, event, orgCode, sessionId] = msg.subject.split('.')) and update any
subsequent uses in this function to reference the new name so the original
function parameter sessionId remains unshadowed.
In `@apps/notification/src/nats/nats.module.ts`:
- Line 24: The CacheModule.register call uses process.env.REDIS_PORT as a
string; change the configuration in the CacheModule.register invocation (the
call named CacheModule.register) to pass a numeric port by parsing
process.env.REDIS_PORT (e.g., Number(...) or parseInt(..., 10)) and handle
missing/invalid values (fallback or throw) so the port passed is an integer
rather than a string.
In `@apps/notification/src/nats/pendingAckStore.ts`:
- Around line 19-29: The save method uses a non-null assertion on msg.info
(const info = msg.info!) which can throw if info is missing; update save to
guard against absent info by checking msg.info (or info = msg.info) before using
it, and handle the error path (e.g., log and return/throw a clear error or
reject the save) rather than asserting; ensure you use the existing identifiers
(save, makeKey, PendingKey, msg.info) so the code locates the guard, and keep
the rest of the logic (store.set and returned key) only when info is present.
🧹 Nitpick comments (4)
apps/notification/src/nats/pendingAckStore.ts (1)
11-37: Unbounded in-memory store may cause memory leak.The
receivedAtfield is stored but never used. Without a TTL or periodic cleanup mechanism, unacknowledged messages will accumulate indefinitely, leading to memory growth in long-running processes.Consider adding a cleanup method or scheduled task to purge stale entries based on
receivedAt.apps/notification/src/nats/nats.module.ts (1)
42-42: Remove or document commented-out code.The
ensureConsumercall is commented out without explanation. Either remove the dead code or add a TODO comment explaining why it's disabled and when it should be re-enabled.apps/notification/src/nats/jetstream.consumer.ts (1)
12-29: Most of the bootstrap logic is commented out.The
onApplicationBootstrapmethod is essentially a no-op, only logging startup. If this consumer is not yet ready for use, consider removing it from the module providers until implementation is complete, or add a clear TODO explaining the intended behavior.apps/notification/src/nats/jetstream.setup.ts (1)
14-31: Catch block assumes all errors mean "stream not found".The current pattern catches any error and proceeds to create the stream. However, errors like permission denied or network issues will be silently masked, and stream creation may fail with a confusing error.
🔧 Proposed fix—check for specific error
try { await jsm.streams.info(STREAM); logger.log(`[NATS] Stream '${STREAM}' already exists`); - } catch { + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + if (!message.includes('stream not found')) { + logger.error(`[NATS] Unexpected error checking stream: ${message}`); + throw err; + } logger.log(`[NATS] Creating stream '${STREAM}'`);
| replay_policy: ReplayPolicy.Instant, | ||
| max_ack_pending: 1000, | ||
| inactive_threshold: 30_000 |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
fd -t f jetstream.setup.tsRepository: credebl/platform
Length of output: 105
🏁 Script executed:
cat -n apps/notification/src/nats/jetstream.setup.ts | sed -n '45,85p'Repository: credebl/platform
Length of output: 1694
🌐 Web query:
NATS JetStream consumer inactive_threshold unit nanoseconds
💡 Result:
inactive_threshold in a JetStream consumer config is encoded as a signed 64-bit integer representing a duration in nanoseconds (nanos since it’s a “Go-style duration” on the wire). The JetStream protocol schema explicitly documents it as “nanoseconds depicting a duration in time, signed 64 bit integer.” [1]
Many client libraries surface it as a Duration type, but when serialized to the JetStream API/protocol it’s still nanoseconds. [1][2]
Example: inactive_threshold: 5_000_000_000 means 5 seconds. [1]
Sources: [1] https://beta-docs.nats.io/ref/protocols/jetstream, [2] https://nats-io.github.io/nats.js/jetstream/types/StreamConsumerLimits.html
🏁 Script executed:
rg -t ts "function nanos|const nanos|export.*nanos" apps/notification/src/nats/Repository: credebl/platform
Length of output: 42
🏁 Script executed:
rg -t ts "function nanos|const nanos|export.*nanos" --max-count 5Repository: credebl/platform
Length of output: 42
🏁 Script executed:
rg "nanos" apps/notification/src/nats/ -t tsRepository: credebl/platform
Length of output: 300
🌐 Web query:
NATS JavaScript nanos function convert milliseconds nanoseconds
💡 Result:
In the NATS JavaScript client (@nats-io/nats-core), the helper is:
export function nanos(millis: number): Nanos {
return millis * 1000000;
}So milliseconds → nanoseconds = millis * 1_000_000. (jsr.io)
Example:
const ns = nanos(1500); // 1,500,000,000 (1.5s)Note: Nanos is just a number, and the docs warn this conversion can overflow in some environments for very large values. (jsr.io)
Citations:
- 1: https://jsr.io/%40nats-io/nats-core/3.3.0/src/util.ts
- 2: https://jsr.io/%40nats-io/nats-core/doc/internal/~/Nanos
Add nanos() wrapper to inactive_threshold.
The inactive_threshold: 30_000 is missing the nanos() wrapper and should be inactive_threshold: nanos(30_000). NATS JetStream expects durations in nanoseconds. Without the wrapper, the value is 30 microseconds instead of 30 seconds, matching the ack_wait timeout duration.
🤖 Prompt for AI Agents
In `@apps/notification/src/nats/jetstream.setup.ts` around lines 78 - 80, The
inactive_threshold value is currently a raw number (inactive_threshold: 30_000)
which is interpreted as nanoseconds incorrectly; update the JetStream consumer
options by wrapping the value with the nanos() helper (use inactive_threshold:
nanos(30_000)) so it is passed as a proper duration in nanoseconds to match
ack_wait, locating and modifying the inactive_threshold property in the
consumer/stream setup code (search for inactive_threshold in jetstream.setup or
the consumer configuration).
| async onApplicationBootstrap(): Promise<void> { | ||
| this.subscribe('active.user', async (data) => { | ||
| this.logger.log(`[NATS] activate.user Subsciber received data: ${JSON.stringify(data)}`); | ||
| await this.activateUserConsumers(data as INatsUserRequestData); | ||
| }); | ||
|
|
||
| this.subscribe('user.ack', async (data) => { | ||
| this.logger.log('[NATS] user.ack Subsciber received data:', data); | ||
| await this.handleUserAck(data as { ackKey: string }); | ||
| }); | ||
| } |
There was a problem hiding this comment.
Missing await for subscription setup and typos in log messages.
-
The
subscribe()calls are not awaited, soonApplicationBootstrapcompletes before subscriptions are established. This could cause issues if other components expect subscriptions to be ready. -
Typo "Subsciber" should be "Subscriber" in both log messages (lines 19 and 24).
Proposed fix
async onApplicationBootstrap(): Promise<void> {
- this.subscribe('active.user', async (data) => {
- this.logger.log(`[NATS] activate.user Subsciber received data: ${JSON.stringify(data)}`);
+ await this.subscribe('active.user', async (data) => {
+ this.logger.log(`[NATS] active.user Subscriber received data: ${JSON.stringify(data)}`);
await this.activateUserConsumers(data as INatsUserRequestData);
});
- this.subscribe('user.ack', async (data) => {
- this.logger.log('[NATS] user.ack Subsciber received data:', data);
+ await this.subscribe('user.ack', async (data) => {
+ this.logger.log('[NATS] user.ack Subscriber received data:', data);
await this.handleUserAck(data as { ackKey: string });
});
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async onApplicationBootstrap(): Promise<void> { | |
| this.subscribe('active.user', async (data) => { | |
| this.logger.log(`[NATS] activate.user Subsciber received data: ${JSON.stringify(data)}`); | |
| await this.activateUserConsumers(data as INatsUserRequestData); | |
| }); | |
| this.subscribe('user.ack', async (data) => { | |
| this.logger.log('[NATS] user.ack Subsciber received data:', data); | |
| await this.handleUserAck(data as { ackKey: string }); | |
| }); | |
| } | |
| async onApplicationBootstrap(): Promise<void> { | |
| await this.subscribe('active.user', async (data) => { | |
| this.logger.log(`[NATS] active.user Subscriber received data: ${JSON.stringify(data)}`); | |
| await this.activateUserConsumers(data as INatsUserRequestData); | |
| }); | |
| await this.subscribe('user.ack', async (data) => { | |
| this.logger.log('[NATS] user.ack Subscriber received data:', data); | |
| await this.handleUserAck(data as { ackKey: string }); | |
| }); | |
| } |
🤖 Prompt for AI Agents
In `@apps/notification/src/nats/nats-subscriber.ts` around lines 17 - 27, In
onApplicationBootstrap, await the asynchronous subscribe(...) calls so
subscriptions are established before the bootstrap completes: change the two
subscribe invocations inside onApplicationBootstrap to await this.subscribe(...)
and keep their async callbacks as-is; also fix the log message typos by
replacing "Subsciber" with "Subscriber" in the logger.log calls for the
'active.user' and 'user.ack' handlers to ensure correct spelling and consistent
logs (references: onApplicationBootstrap, this.subscribe, activateUserConsumers,
handleUserAck, this.logger.log).
| LoggerModule, | ||
| PlatformConfig, | ||
| ContextInterceptorModule, | ||
| CacheModule.register({ host: process.env.REDIS_HOST, port: process.env.REDIS_PORT }) |
There was a problem hiding this comment.
Redis port should be parsed as integer.
process.env.REDIS_PORT returns a string, but cache configuration typically expects a numeric port.
🔧 Proposed fix
- CacheModule.register({ host: process.env.REDIS_HOST, port: process.env.REDIS_PORT })
+ CacheModule.register({ host: process.env.REDIS_HOST, port: parseInt(process.env.REDIS_PORT || '6379', 10) })📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| CacheModule.register({ host: process.env.REDIS_HOST, port: process.env.REDIS_PORT }) | |
| CacheModule.register({ host: process.env.REDIS_HOST, port: parseInt(process.env.REDIS_PORT || '6379', 10) }) |
🤖 Prompt for AI Agents
In `@apps/notification/src/nats/nats.module.ts` at line 24, The
CacheModule.register call uses process.env.REDIS_PORT as a string; change the
configuration in the CacheModule.register invocation (the call named
CacheModule.register) to pass a numeric port by parsing process.env.REDIS_PORT
(e.g., Number(...) or parseInt(..., 10)) and handle missing/invalid values
(fallback or throw) so the port passed is an integer rather than a string.
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Fix all issues with AI agents
In `@apps/notification/src/nats/jetstream.consumer.ts`:
- Line 125: The call to pendingAckStore.save currently hardcodes the stream name
'notify' causing mismatches with lookups (pendingAckStore.get) and the actual
stream; update the save call to use the STREAM constant instead of the literal
string. Locate the line with pendingAckStore.save('notify', consumerName, msg)
in jetstream.consumer.ts and replace the literal with STREAM so the invocation
becomes pendingAckStore.save(STREAM, consumerName, msg), keeping consumerName
and msg unchanged.
- Around line 112-120: The send Promise returned by
admin.messaging().send(notificationPayload) is not awaited, so failures are
logged but the message is still acknowledged; update the handler in jetstream
consumer to await admin.messaging().send(...) (or use async/await) and only
acknowledge the JetStream message (or call the ack function) after a successful
send, and on error record/emit the failure and NAK/requeue or avoid acking so
the message can be retried; reference admin.messaging().send, the surrounding
.then/.catch block, and the code path that calls the message ack to implement
this change.
- Around line 34-35: Remove the commented-out developer-local credential path
string "admin.credential.cert('/home/rinkal-bhojani/...')" from the code (the
commented credential line in the JetStream consumer initialization block) to
avoid exposing local filesystem/developer info; leave the configuration object
clean or replace with a generic placeholder or environment-driven reference
(e.g., use process.env or a secure config variable) and ensure no other
hard-coded local paths remain in the same block.
- Around line 128-132: Move the null-check for notificationDetail up to
immediately after the call to getHolderNotificationBySessionId(sessionId) (the
call around line 75) so you validate notificationDetail before any use such as
notificationDetail.holderDid, then delete the later duplicate null-check and
msg.nak() block (the one currently around lines 128-132) to avoid
unreachable/duplicated logic; ensure the early check logs the same error and
naks the message if notificationDetail is missing.
- Around line 27-36: The initialization block using admin.initializeApp assumes
process.env.PRIVATE_KEY (and other creds) exist and calls
process.env.PRIVATE_KEY.replace(...), which will throw if PRIVATE_KEY is
undefined; before calling admin.initializeApp (inside the admin.apps.length
check) validate that PROJECT_ID, CLIENT_EMAIL, and PRIVATE_KEY are present, and
only then call admin.initializeApp with privateKey set to
process.env.PRIVATE_KEY.replace(/\\n/g,'\n'); if any are missing, log a clear
error via the existing logger (or throw) and avoid calling replace—fail fast
with an explicit message referencing process.env.PRIVATE_KEY,
process.env.CLIENT_EMAIL, and process.env.PROJECT_ID so the issue is obvious.
In `@apps/notification/src/nats/jetstream.setup.ts`:
- Line 174: The code sets dataSize: payload.length but payload is typed unknown
and may be an object, so .length will be undefined; replace this with a reliable
byte-size calculation: detect Buffer/string/other then compute size (use
Buffer.byteLength for strings, buffer.length for Buffers, and
Buffer.byteLength(JSON.stringify(payload), 'utf8') for objects) and assign that
to dataSize; update the code around the payload/dataSize assignment (the
variable names "payload" and "dataSize" in the JetStream publishing logic) to
perform these checks and use the computed byte length.
In `@apps/notification/src/nats/nats-subscriber.ts`:
- Line 163: In consumeSessionMessagesWithDidNotify replace the hard-coded stream
name 'notify' passed to pendingAckStore.save with the DID_STREAM constant so the
pending ack is saved against the same stream being consumed; update the call in
pendingAckStore.save(...) to use DID_STREAM (refer to the DID_STREAM symbol and
the pendingAckStore.save invocation) to ensure stream names match.
🧹 Nitpick comments (2)
apps/notification/src/nats/nats-subscriber.ts (1)
130-142: Remove unused methodactivateUserConsumersWithDidNotify.This private method is defined but never called anywhere in the codebase. Removing it will reduce maintenance burden without affecting functionality.
apps/notification/package.json (1)
6-8: Consider pinning firebase-admin to an exact version for reproducible builds.The
^13.6.0version range allows patch updates. Since 13.6.0 is currently the latest version, changing this to"firebase-admin": "13.6.0"(without the caret) would eliminate any future variation from patch releases and improve build reproducibility.
| admin | ||
| .messaging() | ||
| .send(notificationPayload) | ||
| .then((response) => { | ||
| this.logger.log('Successfully sent message:', response); | ||
| }) | ||
| .catch((error) => { | ||
| this.logger.error('Error sending message:', error); | ||
| }); |
There was a problem hiding this comment.
FCM notification send is fire-and-forget — errors may be silently swallowed.
The Promise from admin.messaging().send() is not awaited, so if it fails, the error is logged but the message is still acknowledged at line 121. This could lead to lost notifications without proper tracking.
🔧 Proposed fix
- admin
- .messaging()
- .send(notificationPayload)
- .then((response) => {
- this.logger.log('Successfully sent message:', response);
- })
- .catch((error) => {
- this.logger.error('Error sending message:', error);
- });
- msg.ack();
+ try {
+ const response = await admin.messaging().send(notificationPayload);
+ this.logger.log('Successfully sent message:', response);
+ msg.ack();
+ } catch (fcmError) {
+ this.logger.error('Error sending FCM message:', fcmError);
+ msg.nak(); // Retry the message
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| admin | |
| .messaging() | |
| .send(notificationPayload) | |
| .then((response) => { | |
| this.logger.log('Successfully sent message:', response); | |
| }) | |
| .catch((error) => { | |
| this.logger.error('Error sending message:', error); | |
| }); | |
| try { | |
| const response = await admin.messaging().send(notificationPayload); | |
| this.logger.log('Successfully sent message:', response); | |
| msg.ack(); | |
| } catch (fcmError) { | |
| this.logger.error('Error sending FCM message:', fcmError); | |
| msg.nak(); // Retry the message | |
| } |
🤖 Prompt for AI Agents
In `@apps/notification/src/nats/jetstream.consumer.ts` around lines 112 - 120, The
send Promise returned by admin.messaging().send(notificationPayload) is not
awaited, so failures are logged but the message is still acknowledged; update
the handler in jetstream consumer to await admin.messaging().send(...) (or use
async/await) and only acknowledge the JetStream message (or call the ack
function) after a successful send, and on error record/emit the failure and
NAK/requeue or avoid acking so the message can be retried; reference
admin.messaging().send, the surrounding .then/.catch block, and the code path
that calls the message ack to implement this change.
| } else { | ||
| // ------------- Publishing Messages via NATS ----------------// | ||
|
|
||
| const ackKey = this.pendingAckStore.save('notify', consumerName, msg); |
There was a problem hiding this comment.
Stream name mismatch: uses 'notify' instead of STREAM constant.
For consistency with pendingAckStore.get() lookups and to match the actual stream name ('aggregate'), use the STREAM constant.
🔧 Proposed fix
- const ackKey = this.pendingAckStore.save('notify', consumerName, msg);
+ const ackKey = this.pendingAckStore.save(STREAM, consumerName, msg);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const ackKey = this.pendingAckStore.save('notify', consumerName, msg); | |
| const ackKey = this.pendingAckStore.save(STREAM, consumerName, msg); |
🤖 Prompt for AI Agents
In `@apps/notification/src/nats/jetstream.consumer.ts` at line 125, The call to
pendingAckStore.save currently hardcodes the stream name 'notify' causing
mismatches with lookups (pendingAckStore.get) and the actual stream; update the
save call to use the STREAM constant instead of the literal string. Locate the
line with pendingAckStore.save('notify', consumerName, msg) in
jetstream.consumer.ts and replace the literal with STREAM so the invocation
becomes pendingAckStore.save(STREAM, consumerName, msg), keeping consumerName
and msg unchanged.
| subject, | ||
| stream: ack.stream, | ||
| sequence: ack.seq, | ||
| dataSize: payload.length, |
There was a problem hiding this comment.
payload.length is undefined for object payloads.
payload is typed as unknown, and when it's an object, .length will be undefined. Consider using a more accurate size calculation.
🔧 Proposed fix
- dataSize: payload.length,
+ dataSize: JSON.stringify(payload).length,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| dataSize: payload.length, | |
| dataSize: JSON.stringify(payload).length, |
🤖 Prompt for AI Agents
In `@apps/notification/src/nats/jetstream.setup.ts` at line 174, The code sets
dataSize: payload.length but payload is typed unknown and may be an object, so
.length will be undefined; replace this with a reliable byte-size calculation:
detect Buffer/string/other then compute size (use Buffer.byteLength for strings,
buffer.length for Buffers, and Buffer.byteLength(JSON.stringify(payload),
'utf8') for objects) and assign that to dataSize; update the code around the
payload/dataSize assignment (the variable names "payload" and "dataSize" in the
JetStream publishing logic) to perform these checks and use the computed byte
length.
| // eslint-disable-next-line @typescript-eslint/no-unused-vars | ||
| const [_, domain, event, orgCode, sessionId] = msg.subject.split('.'); | ||
|
|
||
| const ackKey = this.pendingAckStore.save('notify', consumerName, msg); |
There was a problem hiding this comment.
Stream name mismatch in consumeSessionMessagesWithDidNotify.
Uses 'notify' but should use DID_STREAM to match the stream being consumed.
🔧 Proposed fix
- const ackKey = this.pendingAckStore.save('notify', consumerName, msg);
+ const ackKey = this.pendingAckStore.save(DID_STREAM, consumerName, msg);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const ackKey = this.pendingAckStore.save('notify', consumerName, msg); | |
| const ackKey = this.pendingAckStore.save(DID_STREAM, consumerName, msg); |
🤖 Prompt for AI Agents
In `@apps/notification/src/nats/nats-subscriber.ts` at line 163, In
consumeSessionMessagesWithDidNotify replace the hard-coded stream name 'notify'
passed to pendingAckStore.save with the DID_STREAM constant so the pending ack
is saved against the same stream being consumed; update the call in
pendingAckStore.save(...) to use DID_STREAM (refer to the DID_STREAM symbol and
the pendingAckStore.save invocation) to ensure stream names match.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@apps/notification/src/nats/jetstream.setup.ts`:
- Around line 74-77: The log message inside the catch is inconsistent: it
references CONSUMER but the code actually creates PULL_CONSUMER via
jsm.consumers.add; update the log to reference the correct identifier
(PULL_CONSUMER) or include both names for clarity, e.g., log that you're
creating PULL_CONSUMER before calling jsm.consumers.add(STREAM, { name:
PULL_CONSUMER }) so logs accurately reflect the created consumer.
- Line 162: The current logger.log call in jetstream.setup.ts prints the entire
payload (logger.log(`------------Publishing to JetStream------------ ${subject}
with payload: ${JSON.stringify(payload)}`)), which risks exposing sensitive
data; change it to avoid serializing full payload by logging only safe metadata
(e.g., subject, payload size, non-sensitive fields, or a truncated/hashed
preview) and/or implement a sanitizer that strips PII or tokens before logging;
update the logger.log invocation to reference the sanitized or minimal info
instead of JSON.stringify(payload).
- Line 133: The consumer filter_subject is using the wrong prefix so it never
matches DID_STREAM messages; in ensureSessionConsumerWithDidStream() update the
filter_subject construction to use the DID_STREAM subject prefix (e.g., start
with "did-notify.*" or match the DID_STREAM pattern `did-notify.>`) instead of
`notify.*.*.*.${sessionId}`, and ensure sessionId is incorporated to match the
intended subject pattern used by DID_STREAM so the created consumer will
actually receive messages.
🧹 Nitpick comments (1)
apps/notification/src/nats/jetstream.setup.ts (1)
38-41: Consider making source domains configurable.The domains
'del'and'blr'are hardcoded. For different deployment environments, these may need to vary. Consider extracting them to environment variables or configuration.
| // durable_name: consumerName, | ||
| ack_policy: AckPolicy.Explicit, | ||
| deliver_policy: DeliverPolicy.All, | ||
| filter_subject: `notify.*.*.*.${sessionId}`, |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Search for DID_STREAM configuration and publishing patterns
rg -n "DID_STREAM|did-notify" --type ts -A 3 -B 3Repository: credebl/platform
Length of output: 6386
🏁 Script executed:
# Also check the specific file mentioned to see the context around line 55 and 133
wc -l apps/notification/src/nats/jetstream.setup.tsRepository: credebl/platform
Length of output: 109
🏁 Script executed:
# Read the jetstream.setup.ts file to see both line 55 and line 133
cat -n apps/notification/src/nats/jetstream.setup.tsRepository: credebl/platform
Length of output: 7304
Fix filter subject prefix to match DID_STREAM subjects.
The consumer created with ensureSessionConsumerWithDidStream() is added to the DID_STREAM (which has subjects did-notify.> at line 55), but the filter_subject uses the wrong prefix: notify.*.*.*.${sessionId} instead of did-notify.*. This mismatch means the consumer will never receive any messages.
🔧 Proposed fix
deliver_policy: DeliverPolicy.All,
- filter_subject: `notify.*.*.*.${sessionId}`,
+ filter_subject: `${DID_STREAM}.*.*.*.${sessionId}`,
replay_policy: ReplayPolicy.Instant,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| filter_subject: `notify.*.*.*.${sessionId}`, | |
| deliver_policy: DeliverPolicy.All, | |
| filter_subject: `did-notify.*.*.*.${sessionId}`, | |
| replay_policy: ReplayPolicy.Instant, |
🤖 Prompt for AI Agents
In `@apps/notification/src/nats/jetstream.setup.ts` at line 133, The consumer
filter_subject is using the wrong prefix so it never matches DID_STREAM
messages; in ensureSessionConsumerWithDidStream() update the filter_subject
construction to use the DID_STREAM subject prefix (e.g., start with
"did-notify.*" or match the DID_STREAM pattern `did-notify.>`) instead of
`notify.*.*.*.${sessionId}`, and ensure sessionId is incorporated to match the
intended subject pattern used by DID_STREAM so the created consumer will
actually receive messages.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@libs/service/nats.options.ts`:
- Line 8: The servers value currently uses `${process.env.NATS_URL}`.split(',')
which yields "undefined" or empty entries; update the code that sets the servers
property (the servers field in the NATS options) to first read
process.env.NATS_URL, validate it exists (throw or log a clear config error if
missing), then split by ',' and map each entry through trim(), filtering out
empty strings before assigning to servers; apply the same defensive pattern to
other occurrences that parse NATS_URL (e.g., nats.config.ts, nats.service.ts,
fido.module.ts) to ensure only non-empty, trimmed URLs are passed to NATS.js.
🧹 Nitpick comments (1)
apps/notification/src/nats/nats.service.ts (1)
52-57:asynckeyword is misleading;nc.publish()is synchronous.The method declares
asyncbutnc.publish()is synchronous (returnsvoid), so the promise resolves immediately without waiting for message delivery. If you need delivery confirmation, use JetStream'spublish()instead which returns aPubAck.Also, the hardcoded reply subject
'ack-notification'reduces flexibility for other use cases.♻️ Option: Use JetStream publish for delivery guarantees
- async publish(subject: string, payload: unknown): Promise<void> { + async publish(subject: string, payload: unknown, reply?: string): Promise<void> { if (!this.connected) { throw new Error('NATS not connected yet'); } - this.nc.publish(subject, Buffer.from(JSON.stringify(payload)), { reply: 'ack-notification' }); + const data = new TextEncoder().encode(JSON.stringify(payload)); + if (reply) { + this.nc.publish(subject, data, { reply }); + } else { + await this.js.publish(subject, data); // JetStream publish with ack + } }
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Fix all issues with AI agents
In @.env.demo:
- Around line 29-30: Swap the two NATS environment variable lines so that
NATS_PASSWORD appears before NATS_USER to satisfy dotenv-linter ordering;
specifically reorder the entries referencing NATS_PASSWORD and NATS_USER in the
.env demo so the line with NATS_PASSWORD precedes the line with NATS_USER.
- Around line 246-248: Remove the trailing semicolons from the stream
environment values so the variable values match expected stream names; update
AGGREGATE_STREAM, DID_STREAM, and PULL_CONSUMER to be "aggregate", "did-notify",
and "hub-pull-consumer" respectively (remove the ";" characters) so JetStream
setup and code lookups use the correct names.
- Around line 243-244: The env value for CONSUMER_CONFIG_ACK_WAIT uses an
underscore-style numeric literal which causes Number("10_000") to return NaN and
makes ack_wait invalid; change the value to a plain numeric string like "10000"
(quoted to satisfy dotenv-linter) so code that parses
Number(process.env.CONSUMER_CONFIG_ACK_WAIT) yields a valid number for ack_wait.
- Around line 236-242: Remove the extra blank lines and reorder the three
environment variables so CLIENT_EMAIL and PRIVATE_KEY appear before PROJECT_ID;
specifically, consolidate the block into a single contiguous group with the keys
in the order CLIENT_EMAIL, PRIVATE_KEY, PROJECT_ID and no extra empty lines to
satisfy dotenv-linter.
In @.env.sample:
- Around line 62-63: Swap the two NATS variables in .env.sample so NATS_PASSWORD
appears before NATS_USER: reorder the lines containing NATS_PASSWORD and
NATS_USER (ensure the file now lists NATS_PASSWORD=xxxx followed by
NATS_USER=xxxx) to satisfy dotenv-linter expectations.
- Around line 267-269: Update the three environment variable values in
.env.sample so their stream names do not include trailing semicolons: remove the
ending ";" from AGGREGATE_STREAM, DID_STREAM, and PULL_CONSUMER values so the
variables read plain stream names (e.g., AGGREGATE_STREAM=aggregate,
DID_STREAM=did-notify, PULL_CONSUMER=hub-pull-consumer).
- Around line 264-265: The CONSUMER_CONFIG_ACK_WAIT value in .env.sample is set
to 10_000 which becomes NaN when parsed (e.g., Number("10_000")), so change the
variable CONSUMER_CONFIG_ACK_WAIT to a plain numeric string (e.g., 10000) and
quote it so the parser receives a valid number; ensure any code that reads this
env (ack_wait) still expects a numeric string and converts it (Number or
parseInt) accordingly.
- Around line 258-263: Remove the extra blank lines and reorder the keys in
.env.sample so that CLIENT_EMAIL and PRIVATE_KEY appear before PROJECT_ID (no
blank lines between the three entries), i.e. place CLIENT_EMAIL= then
PRIVATE_KEY= then PROJECT_ID= on consecutive lines to satisfy dotenv-linter key
ordering and spacing rules; keep the variable names exactly as shown
(CLIENT_EMAIL, PRIVATE_KEY, PROJECT_ID).
In `@apps/notification/src/nats/jetstream.setup.ts`:
- Around line 96-100: The call to jsc.publish currently passes
JSON.stringify(payload) as a string; change it to pass
Buffer.from(JSON.stringify(payload)) to match the encoding pattern used
elsewhere (e.g., nats.service.ts) so the payload is consistently sent as bytes.
Update the publish call that assigns ack from jsc.publish(subject, ...) to use
Buffer.from(JSON.stringify(payload)) (referencing jsc.publish, subject, payload,
and ack) and ensure any TypeScript types/imports remain valid after the change.
| CONSUMER_CONFIG_ACK_WAIT=10_000 # IN nanos(10_000) | ||
| CONSUMER_CONFIG_MAX_DELIVER=4 |
There was a problem hiding this comment.
Fix CONSUMER_CONFIG_ACK_WAIT to avoid NaN.
Number("10_000") returns NaN, so ack_wait becomes invalid. Use a plain numeric string (and quote it to satisfy dotenv-linter).
✅ Suggested fix
-CONSUMER_CONFIG_ACK_WAIT=10_000 # IN nanos(10_000)
+CONSUMER_CONFIG_ACK_WAIT="10000" # milliseconds; parsed via Number(...)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| CONSUMER_CONFIG_ACK_WAIT=10_000 # IN nanos(10_000) | |
| CONSUMER_CONFIG_MAX_DELIVER=4 | |
| CONSUMER_CONFIG_ACK_WAIT="10000" # milliseconds; parsed via Number(...) | |
| CONSUMER_CONFIG_MAX_DELIVER=4 |
🧰 Tools
🪛 dotenv-linter (4.0.0)
[warning] 243-243: [ValueWithoutQuotes] This value needs to be surrounded in quotes
(ValueWithoutQuotes)
🤖 Prompt for AI Agents
In @.env.demo around lines 243 - 244, The env value for CONSUMER_CONFIG_ACK_WAIT
uses an underscore-style numeric literal which causes Number("10_000") to return
NaN and makes ack_wait invalid; change the value to a plain numeric string like
"10000" (quoted to satisfy dotenv-linter) so code that parses
Number(process.env.CONSUMER_CONFIG_ACK_WAIT) yields a valid number for ack_wait.
|
|
||
|
|
||
| PROJECT_ID= | ||
| CLIENT_EMAIL= | ||
| PRIVATE_KEY= | ||
|
|
There was a problem hiding this comment.
Clean up the new env block ordering/spacing.
dotenv-linter flags extra blank lines and key ordering (CLIENT_EMAIL/PRIVATE_KEY before PROJECT_ID).
🧹 Suggested cleanup
-
-
-PROJECT_ID=
-CLIENT_EMAIL=
-PRIVATE_KEY=
+CLIENT_EMAIL=
+PRIVATE_KEY=
+PROJECT_ID=📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| PROJECT_ID= | |
| CLIENT_EMAIL= | |
| PRIVATE_KEY= | |
| CLIENT_EMAIL= | |
| PRIVATE_KEY= | |
| PROJECT_ID= | |
🧰 Tools
🪛 dotenv-linter (4.0.0)
[warning] 259-259: [ExtraBlankLine] Extra blank line detected
(ExtraBlankLine)
[warning] 261-261: [UnorderedKey] The CLIENT_EMAIL key should go before the PROJECT_ID key
(UnorderedKey)
[warning] 262-262: [UnorderedKey] The PRIVATE_KEY key should go before the PROJECT_ID key
(UnorderedKey)
🤖 Prompt for AI Agents
In @.env.sample around lines 258 - 263, Remove the extra blank lines and reorder
the keys in .env.sample so that CLIENT_EMAIL and PRIVATE_KEY appear before
PROJECT_ID (no blank lines between the three entries), i.e. place CLIENT_EMAIL=
then PRIVATE_KEY= then PROJECT_ID= on consecutive lines to satisfy dotenv-linter
key ordering and spacing rules; keep the variable names exactly as shown
(CLIENT_EMAIL, PRIVATE_KEY, PROJECT_ID).
| CONSUMER_CONFIG_ACK_WAIT=10_000 # IN nanos(10_000) | ||
| CONSUMER_CONFIG_MAX_DELIVER=4 |
There was a problem hiding this comment.
Fix CONSUMER_CONFIG_ACK_WAIT to avoid NaN.
Number("10_000") returns NaN, so ack_wait becomes invalid. Use a plain numeric string and quote it.
✅ Suggested fix
-CONSUMER_CONFIG_ACK_WAIT=10_000 # IN nanos(10_000)
+CONSUMER_CONFIG_ACK_WAIT="10000" # milliseconds; parsed via Number(...)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| CONSUMER_CONFIG_ACK_WAIT=10_000 # IN nanos(10_000) | |
| CONSUMER_CONFIG_MAX_DELIVER=4 | |
| CONSUMER_CONFIG_ACK_WAIT="10000" # milliseconds; parsed via Number(...) | |
| CONSUMER_CONFIG_MAX_DELIVER=4 |
🧰 Tools
🪛 dotenv-linter (4.0.0)
[warning] 264-264: [ValueWithoutQuotes] This value needs to be surrounded in quotes
(ValueWithoutQuotes)
🤖 Prompt for AI Agents
In @.env.sample around lines 264 - 265, The CONSUMER_CONFIG_ACK_WAIT value in
.env.sample is set to 10_000 which becomes NaN when parsed (e.g.,
Number("10_000")), so change the variable CONSUMER_CONFIG_ACK_WAIT to a plain
numeric string (e.g., 10000) and quote it so the parser receives a valid number;
ensure any code that reads this env (ack_wait) still expects a numeric string
and converts it (Number or parseInt) accordingly.
| const ack = await jsc.publish( | ||
| `${subject}`, | ||
| // sc.encode(JSON.stringify(payload)) | ||
| JSON.stringify(payload) | ||
| ); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's examine the file in question
cat -n apps/notification/src/nats/jetstream.setup.ts | head -120 | tail -40Repository: credebl/platform
Length of output: 1629
🏁 Script executed:
# Check package.json to understand NATS library version and dependencies
cat package.json | grep -A 5 -B 5 "nats"Repository: credebl/platform
Length of output: 401
🏁 Script executed:
# Search for other publish calls in the codebase to see how they handle encoding
rg "\.publish\(" apps/notification/src/ -B 2 -A 2Repository: credebl/platform
Length of output: 1778
🏁 Script executed:
# Check if there are type definitions or imports from nats library
rg "from ['\"](nats|@nats)" apps/notification/src/ -A 2Repository: credebl/platform
Length of output: 957
🏁 Script executed:
# Check if there's a type definition file or look for sc variable definition
cat -n apps/notification/src/nats/jetstream.setup.ts | head -30Repository: credebl/platform
Length of output: 1439
🏁 Script executed:
# Search for StringCodec or sc in nats related files
rg "StringCodec|sc\s*=" apps/notification/src/nats/Repository: credebl/platform
Length of output: 153
🌐 Web query:
NATS JetStreamClient publish method signature nats 2.15.1 TypeScript
💡 Result:
In NATS JavaScript/TypeScript (nats.js), JetStreamClient.publish is:
publish(
subj: string,
payload?: Payload,
options?: Partial<JetStreamPublishOptions>,
): Promise<PubAck>;Source: JetStreamClient type definition in @nats-io/jetstream docs. [1]
References
🏁 Script executed:
# Let's check the nats package to see what Payload type is
rg "type Payload|interface Payload" apps/notification/node_modules/nats/ 2>/dev/null || echo "Checking node_modules not available, will search for usage patterns"Repository: credebl/platform
Length of output: 127
🌐 Web query:
NATS.js Payload type definition string Uint8Array Buffer
💡 Result:
In NATS.js (the nats / @nats-io/* JavaScript clients):
-
The payload type used by APIs like
publish()/respond()is typically:type Payload = Uint8Array | string;
(tessl.io)
-
The received message payload (
Msg.data) is aUint8Array(even if you published astring, it’s carried/handled as bytes under the hood, with helpers to convert):- Docs: “Payloads are
Uint8Arraysorstring”, and messages providestring()/json()helpers. (nats-io.github.io) - JetStream
JsMsg.datais explicitlyUint8Arrayin the typings. (app.unpkg.com)
- Docs: “Payloads are
-
Node.js
Bufferworks as a payload becauseBufferis a subclass ofUint8Array, so it satisfiesUint8Arraywherever NATS expects bytes. (nodejs.org)
Practical mapping:
- If you have a string → send as
stringdirectly or useStringCodec().encode(...)(givesUint8Array). (nats-io.github.io) - If you have binary (
Uint8ArrayorBuffer) → send directly as the payload. (tessl.io)
Citations:
- 1: https://tessl.io/registry/tessl/npm-nats/2.29.0/files/docs/messaging.md?utm_source=openai
- 2: https://nats-io.github.io/nats.js/core/index.html
- 3: https://app.unpkg.com/nats%402.24.0/files/lib/jetstream/jsmsg.d.ts?utm_source=openai
- 4: https://nodejs.org/api/buffer.html?utm_source=openai
- 5: https://nats-io.github.io/nats.js/core/index.html
- 6: https://tessl.io/registry/tessl/npm-nats/2.29.0/files/docs/messaging.md?utm_source=openai
🏁 Script executed:
# Check other usages of jsc.publish in the codebase
rg "jsc\.publish|jetstream.*publish" apps/notification/src/ -B 2 -A 3Repository: credebl/platform
Length of output: 629
Use Buffer.from() for consistent payload encoding, matching the pattern used elsewhere in the codebase.
JetStreamClient.publish accepts both string and Uint8Array payloads. However, nats.service.ts already uses Buffer.from(JSON.stringify(payload)) for consistency and explicit byte handling. Apply the same pattern here.
✅ Suggested fix
- const ack = await jsc.publish(
- `${subject}`,
- // sc.encode(JSON.stringify(payload))
- JSON.stringify(payload)
- );
+ const encoded = Buffer.from(JSON.stringify(payload));
+ const ack = await jsc.publish(subject, encoded);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const ack = await jsc.publish( | |
| `${subject}`, | |
| // sc.encode(JSON.stringify(payload)) | |
| JSON.stringify(payload) | |
| ); | |
| const encoded = Buffer.from(JSON.stringify(payload)); | |
| const ack = await jsc.publish(subject, encoded); |
🤖 Prompt for AI Agents
In `@apps/notification/src/nats/jetstream.setup.ts` around lines 96 - 100, The
call to jsc.publish currently passes JSON.stringify(payload) as a string; change
it to pass Buffer.from(JSON.stringify(payload)) to match the encoding pattern
used elsewhere (e.g., nats.service.ts) so the payload is consistently sent as
bytes. Update the publish call that assigns ack from jsc.publish(subject, ...)
to use Buffer.from(JSON.stringify(payload)) (referencing jsc.publish, subject,
payload, and ack) and ensure any TypeScript types/imports remain valid after the
change.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@apps/notification/src/nats/jetstream.consumer.ts`:
- Around line 63-68: The code currently logs full message payloads which may
contain sensitive data; update the logging in the JetStream consumer to avoid
printing raw payloads: replace uses of this.logger.debug that log the full
payload and the this.logger.log/this.logger.debug calls that might include
notificationDetail by logging only non-sensitive metadata (e.g., sessionId,
orgCode, domain, event, and payload keys via Object.keys(payload)) or masked
summaries instead of the full payload or notificationDetail objects; target the
logging calls named logger.log, logger.debug, and any use of notificationDetail
in this module and ensure only metadata or masked values are emitted.
- Around line 123-127: Inside the consume method's for-await loop, using
"return" when notificationDetail is falsy exits the entire consumer instead of
skipping the current message; change the control flow to call msg.nak() and use
"continue" to skip processing this message so the for-await loop keeps running
(locate the block that checks notificationDetail, the msg.nak() call, and
replace the "return" with "continue").
🧹 Nitpick comments (2)
.env.sample (1)
267-269: Stream names are now correct; add trailing newline.The stream names no longer have trailing semicolons (good). However, the file is missing a trailing newline at line 269, which the linter flags.
🧹 Add trailing newline
PULL_CONSUMER=hub-pull-consumer +.env.demo (1)
246-248: Stream names are correct; add trailing newline.The stream names are properly set without trailing semicolons. Add a trailing newline to satisfy the linter.
🧹 Add trailing newline
PULL_CONSUMER=hub-pull-consumer +
Signed-off-by: Rinkal Bhojani <rinkal.bhojani@ayanworks.com>
Signed-off-by: Rinkal Bhojani <rinkal.bhojani@ayanworks.com>
Signed-off-by: Rinkal Bhojani <rinkal.bhojani@ayanworks.com>
Signed-off-by: Rinkal Bhojani <rinkal.bhojani@ayanworks.com>
Signed-off-by: Rinkal Bhojani <rinkal.bhojani@ayanworks.com>
Signed-off-by: Rinkal Bhojani <rinkal.bhojani@ayanworks.com>
Signed-off-by: Rinkal Bhojani <rinkal.bhojani@ayanworks.com>
Signed-off-by: Rinkal Bhojani <rinkal.bhojani@ayanworks.com>
Signed-off-by: Rinkal Bhojani <rinkal.bhojani@ayanworks.com>
Signed-off-by: Rinkal Bhojani <rinkal.bhojani@ayanworks.com>
Signed-off-by: Rinkal Bhojani <rinkal.bhojani@ayanworks.com>
Signed-off-by: shitrerohit <rohit.shitre@ayanworks.com>
7d05810 to
71bccd3
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@apps/notification/src/nats/pendingAckStore.ts`:
- Around line 13-29: The in-memory pending map (store) can grow unbounded when
ACKs never arrive; add expiry-based eviction by introducing a TTL constant
(e.g., PENDING_TTL_MS), storing receivedAt (already present) and implementing a
purgeExpired() method that iterates store keys and deletes entries older than
TTL; call purgeExpired() on a periodic timer started in the class constructor
(use setInterval) and ensure the timer is cleared in a destroy/close method to
avoid leaks; update save()/makeKey()/PendingMessage usage as needed to rely on
receivedAt and ensure tests cover eviction behavior.
In `@apps/oid4vc-verification/src/oid4vc-verification.service.ts`:
- Around line 395-396: Replace the unsafe JSON.stringify(error) used in the
createVerificationPresentation error logs with ErrorHandler.format(error) to
ensure safe serialization (handles circular refs and preserves message/stack);
update the this.logger.error calls inside the createVerificationPresentation
flow (and the similar logger call later around the second occurrence noted) to
call ErrorHandler.format(error) instead of JSON.stringify(error), ensuring
ErrorHandler is imported where needed.
| private readonly store = new Map<PendingKey, PendingMessage>(); | ||
|
|
||
| makeKey(stream: string, consumer: string, streamSequence: number): PendingKey { | ||
| return `${stream}:${consumer}:${streamSequence}`; | ||
| } | ||
|
|
||
| save(stream: string, consumer: string, msg: JsMsg): PendingKey { | ||
| const { info } = msg; | ||
| if (!info) { | ||
| throw new Error('Cannot save message without info metadata'); | ||
| } | ||
| const key = this.makeKey(stream, consumer, info.streamSequence); | ||
|
|
||
| this.store.set(key, { | ||
| msg, | ||
| receivedAt: Date.now() | ||
| }); |
There was a problem hiding this comment.
Add eviction for unacked messages to avoid unbounded growth.
If ACKs never arrive, the in-memory map grows indefinitely. Consider an expiry-based purge and call it periodically.
🧹 Suggested addition
export class PendingAckStore {
private readonly store = new Map<PendingKey, PendingMessage>();
@@
delete(key: PendingKey): void {
this.store.delete(key);
}
+
+ purgeExpired(maxAgeMs: number): number {
+ const now = Date.now();
+ let removed = 0;
+ for (const [key, value] of this.store) {
+ if (now - value.receivedAt > maxAgeMs) {
+ this.store.delete(key);
+ removed++;
+ }
+ }
+ return removed;
+ }
}🤖 Prompt for AI Agents
In `@apps/notification/src/nats/pendingAckStore.ts` around lines 13 - 29, The
in-memory pending map (store) can grow unbounded when ACKs never arrive; add
expiry-based eviction by introducing a TTL constant (e.g., PENDING_TTL_MS),
storing receivedAt (already present) and implementing a purgeExpired() method
that iterates store keys and deletes entries older than TTL; call purgeExpired()
on a periodic timer started in the class constructor (use setInterval) and
ensure the timer is cleared in a destroy/close method to avoid leaks; update
save()/makeKey()/PendingMessage usage as needed to rely on receivedAt and ensure
tests cover eviction behavior.
| this.logger.error( | ||
| `[createVerificationPresentation] - error creating verification presentation: ${JSON.stringify(error?.response ?? error)}` | ||
| `[createVerificationPresentation] - error creating verification presentation: ${JSON.stringify(error)}` |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
rg -n "safeStringify|stringifyError|formatError|serializeError|safe-stringify|fast-safe-stringify|util\.inspect" -SRepository: credebl/platform
Length of output: 663
🏁 Script executed:
wc -l apps/oid4vc-verification/src/oid4vc-verification.service.tsRepository: credebl/platform
Length of output: 123
🏁 Script executed:
sed -n '390,400p' apps/oid4vc-verification/src/oid4vc-verification.service.tsRepository: credebl/platform
Length of output: 476
🏁 Script executed:
sed -n '450,460p' apps/oid4vc-verification/src/oid4vc-verification.service.tsRepository: credebl/platform
Length of output: 502
🏁 Script executed:
# Check error handling patterns in the codebase
rg "JSON\.stringify\(error" -n -B 2 -A 2Repository: credebl/platform
Length of output: 50373
🏁 Script executed:
# Check if there are existing error utilities/helpers
fd -t f "error|util" apps/oid4vc-verification/src | head -20Repository: credebl/platform
Length of output: 42
🏁 Script executed:
cat -n libs/common/src/utils/error-handler.util.tsRepository: credebl/platform
Length of output: 6939
🏁 Script executed:
head -30 apps/oid4vc-verification/src/oid4vc-verification.service.tsRepository: credebl/platform
Length of output: 1525
🏁 Script executed:
rg "ErrorHandler|fast-safe-stringify" apps/oid4vc-verification/src/oid4vc-verification.service.ts -nRepository: credebl/platform
Length of output: 42
🏁 Script executed:
# Check if ErrorHandler is already used in similar services
rg "ErrorHandler" apps/verification/src/verification.service.ts -nRepository: credebl/platform
Length of output: 42
Use ErrorHandler.format() for safe error serialization in logs.
The codebase provides ErrorHandler.format() (libs/common/src/utils/error-handler.util.ts) specifically to handle this case: it safely handles circular references, preserves error details (message/stack), and falls back to util.inspect() for richer output. Replace JSON.stringify(error) with ErrorHandler.format(error).
🛠️ Suggested fix
+import { ErrorHandler } from '@credebl/common/utils/error-handler.util';
this.logger.error(
- `[createVerificationPresentation] - error creating verification presentation: ${JSON.stringify(error)}`
+ `[createVerificationPresentation] - error creating verification presentation: ${ErrorHandler.format(error)}`
);
throw new RpcException(error?.response ?? error); this.logger.error(
- `[getVerificationSessionResponse] - error: ${JSON.stringify(error)}`
+ `[getVerificationSessionResponse] - error: ${ErrorHandler.format(error)}`
);
throw new RpcException(error?.response ?? error);Also applies to: 454-455
🤖 Prompt for AI Agents
In `@apps/oid4vc-verification/src/oid4vc-verification.service.ts` around lines 395
- 396, Replace the unsafe JSON.stringify(error) used in the
createVerificationPresentation error logs with ErrorHandler.format(error) to
ensure safe serialization (handles circular refs and preserves message/stack);
update the this.logger.error calls inside the createVerificationPresentation
flow (and the similar logger call later around the second occurrence noted) to
call ErrorHandler.format(error) instead of JSON.stringify(error), ensuring
ErrorHandler is imported where needed.
There was a problem hiding this comment.
@RinkalBhojani We need to make to store a persistent store using NATS KV, so that there is no chance of losing the pending messages in case of server/service restarts.
There was a problem hiding this comment.
Yes, I will do that in next PR.
Signed-off-by: Rinkal Bhojani <rinkal.bhojani@ayanworks.com>
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@apps/notification/src/nats/jetstream.consumer.ts`:
- Around line 84-90: The branch that handles deliveries exceeding maxDeliver
calls moveMsgToDidStream(notificationDetail, msg) and
sendNotificationToHolder(event, orgCode, notificationDetail) without checking
notificationDetail; add a null/undefined guard before these calls (e.g., if
(!notificationDetail) { log.warn/handle and ack/nack as appropriate } ) to avoid
runtime errors, and only call moveMsgToDidStream and sendNotificationToHolder
when notificationDetail is present; reference the variables notificationDetail
and msg and the methods moveMsgToDidStream and sendNotificationToHolder to
locate and update the logic.
🧹 Nitpick comments (3)
apps/oid4vc-verification/src/oid4vc-verification.service.ts (1)
338-339: Avoid logging full template payloads.Logging the entire template object can inflate logs and risk leaking sensitive data. Consider logging only intent/orgId (or a template identifier if available).
Proposed change
- this.logger.debug( - `[createIntentBasedVerificationPresentation] template fetched successfully: ${JSON.stringify(templateData)}` - ); + this.logger.debug( + `[createIntentBasedVerificationPresentation] template fetched successfully for intent=${intent}, orgId=${orgId}` + );apps/notification/src/nats/nats.service.ts (1)
20-28: Use validated variables consistently.Lines 20-23 validate and destructure the env vars, but lines 26-27 still reference
process.envdirectly. Use the validated variables for consistency and to avoid redundant lookups.♻️ Proposed fix
this.nc = await connect({ - servers: `${process.env.NATS_URL}`.split(','), - authenticator: usernamePasswordAuthenticator(`${process.env.NATS_USER}`, `${process.env.NATS_PASSWORD}`) + servers: NATS_URL.split(','), + authenticator: usernamePasswordAuthenticator(NATS_USER, NATS_PASSWORD) });apps/notification/src/nats/jetstream.consumer.ts (1)
148-148: Add explicit type formsgparameter.The
msgparameter lacks a type annotation, defaulting toany. Add theJsMsgtype for better type safety and IDE support.♻️ Proposed fix
+import { Consumer, JsMsg } from 'nats'; ... - private async moveMsgToDidStream(notificationDetail: IHolderNotification, msg): Promise<void> { + private async moveMsgToDidStream(notificationDetail: IHolderNotification, msg: JsMsg): Promise<void> {
| if (maxDeliver - 1 < msg.info.deliveryCount) { | ||
| //------------ Moving message to DID stream ---------------// | ||
| await this.moveMsgToDidStream(notificationDetail, msg); | ||
|
|
||
| //------------- Sending Push Notification via FCM ----------------// | ||
| this.sendNotificationToHolder(event, orgCode, notificationDetail); | ||
| msg.ack(); |
There was a problem hiding this comment.
Missing null check for notificationDetail in maxDeliver branch.
When deliveryCount exceeds maxDeliver - 1, the code calls moveMsgToDidStream(notificationDetail, msg) and sendNotificationToHolder(event, orgCode, notificationDetail) without verifying notificationDetail is not null. This will cause a runtime error if the notification detail doesn't exist.
🔧 Proposed fix
if (maxDeliver - 1 < msg.info.deliveryCount) {
+ if (!notificationDetail) {
+ this.logger.error(`[NATS] No notification detail found for session ID: ${sessionId}, message exhausted`);
+ msg.ack(); // Ack to prevent infinite retries since max deliver reached
+ continue;
+ }
//------------ Moving message to DID stream ---------------//
await this.moveMsgToDidStream(notificationDetail, msg);🤖 Prompt for AI Agents
In `@apps/notification/src/nats/jetstream.consumer.ts` around lines 84 - 90, The
branch that handles deliveries exceeding maxDeliver calls
moveMsgToDidStream(notificationDetail, msg) and sendNotificationToHolder(event,
orgCode, notificationDetail) without checking notificationDetail; add a
null/undefined guard before these calls (e.g., if (!notificationDetail) {
log.warn/handle and ack/nack as appropriate } ) to avoid runtime errors, and
only call moveMsgToDidStream and sendNotificationToHolder when
notificationDetail is present; reference the variables notificationDetail and
msg and the methods moveMsgToDidStream and sendNotificationToHolder to locate
and update the logic.



What?
Summary by CodeRabbit