Skip to content

Conversation

@anxolin
Copy link
Contributor

@anxolin anxolin commented Apr 21, 2025

Hackathon PUSH notification.

This PR includes a bunch of PRs related to including PUSH notification feature

Includes:

Summary by CodeRabbit

  • New Features

    • Introduced a modular notification producer architecture supporting both CMS and trade notifications with improved reliability and graceful shutdown.
    • Added persistent indexer state tracking in the database for reliable notification processing.
    • Enabled batch processing and sending of multiple push notifications.
    • Implemented repository and service factories for streamlined dependency management.
    • Added utility functions for formatting, logging, and environment validation.
  • Improvements

    • Enhanced Telegram notification handling with batch processing, retries, and robust error handling.
    • Unified and simplified pagination and API usage for fetching subscriptions and accounts.
    • Standardized logging across services using a shared logger.
    • Improved Redis and database configuration flexibility via environment variables.
  • Bug Fixes

    • Prevented sending empty notification batches to the queue.
    • Fixed cache and API endpoint inconsistencies in subscription fetching.
  • Documentation

    • Updated README with clearer local setup instructions and added a quick test procedure for push notifications.
    • Added example database environment variable to the environment template.
  • Chores

    • Upgraded dependencies and TypeScript target version.
    • Updated .gitignore to exclude new files.
  • Tests

    • Added and refactored tests for notification queue posting and repository logic.

anxolin added 5 commits April 18, 2025 12:42
* feat: refactor producers

* feat: implement runnables

* fix: fix build

* chore: improve comment

* chore: fix name
* fwat: get all subscriptions

* feat: get subscribed users and create skeleton of the trade producer
* feat: create db and update

* feat: add retry logic

* feat: gracefully stop

* fix: fix introspection issue

* feat: graceful

* chore: remove

* chore: allow to disable redis

* fix: fix update db

* fix: handle concurrency edge case
* feat: refactor IoC and format amounts

* chore: use shared
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Apr 21, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

This update introduces a modular, repository-driven architecture for the notification producer, enabling scalable and resilient notification delivery. It adds new repository interfaces and implementations for push notifications, push subscriptions, and indexer state persistence using PostgreSQL. The notification producer is refactored to use producer classes (CmsNotificationProducer and TradeNotificationProducer) that continuously fetch and send notifications for subscribed accounts, with stateful tracking and graceful shutdown. Supporting utilities for logging, formatting, and repeated execution (doForever) are introduced in shared libraries. Notification parsing and queueing are updated to support batch operations. The README and environment configuration are updated to reflect new setup requirements.

Changes

Files / Groups Change Summary
.gitignore, .env.example, package.json, tsconfig.base.json Add ignore rule for mcp.json; add DATABASE_HOST example; update pg version and add types; set TS target to ES2020.
README.md Update local setup instructions for notifications, describe new dependencies, and add test procedure.
apps/api/src/app/inversify.config.ts, apps/api/src/main.ts, apps/api/tsconfig.app.json Refactor dependency injection to use repository factory functions from @cowprotocol/services; update logger import; include shared logger in TS config.
apps/notification-producer/db.sql Create indexer_state table with triggers for state persistence.
apps/notification-producer/src/main.ts Refactor to modular producer-based architecture; replace monolithic logic with producer lifecycle management and graceful shutdown.
apps/notification-producer/src/postToQueueTest.test.ts, apps/notification-producer/src/postToQueueTest.ts Add new test for posting notifications to queue; remove old test script.
apps/notification-producer/types.ts Add Runnable interface for producer lifecycle management.
apps/notification-producer/src/producers/cms/CmsNotificationProducer.ts Add CMS notification producer class with in-memory pending cache and periodic notification fetching.
apps/notification-producer/src/producers/trade/TradeNotificationProducer.ts Add trade notification producer class with indexer state persistence and block range processing.
apps/notification-producer/src/producers/trade/fromOrderInvalidationToNotification.ts, apps/notification-producer/src/producers/trade/fromTradeToNotification.ts, apps/notification-producer/src/producers/trade/getTradeNotifications.ts Add functions for transforming blockchain events into notifications and fetching trade-related logs.
apps/telegram/src/main.ts Update to batch notification parsing, add retry mechanism, standardize logging, and refactor main loop.
libs/cms-api/src/index.ts Refactor pagination logic, update endpoints, and add error handling for fetching subscriptions and accounts.
libs/notifications/src/index.ts Rename Notification to PushNotification, update parsing and queueing to batch operations, and guard against empty sends.
libs/repositories/src/IndexerStateRepository/IndexerStateRepository.ts, libs/repositories/src/PushNotificationsRepository/PushNotificationsRepository.ts, libs/repositories/src/PushSubscriptionsRepository/PushSubscriptionsRepository.ts, libs/repositories/src/datasources/postgres.ts Add repository interfaces and implementations for indexer state, push notifications, and subscriptions; add Postgres pool creation.
libs/repositories/src/SimulationRepository/SimulationRepositoryTenderly.ts, libs/repositories/src/UsdRepository/UsdRepositoryCow.ts, libs/repositories/src/UsdRepository/UsdRepositoryCow.spec.ts, libs/repositories/src/UsdRepository/UsdRepositoryCoingecko.test.ts Standardize logging using shared logger; remove logger injection and mocks.
libs/repositories/src/datasources/redis.ts Improve Redis enablement logic to allow explicit override.
libs/repositories/src/datasources/cowApi.ts, apps/twap/src/app/utils/getApiBaseUrl.ts Use shared network name mappings for API URLs.
libs/repositories/src/index.ts Update exports for new repositories and data sources.
libs/services/src/factories.ts, libs/services/src/index.ts Add repository factory functions and export them for use in dependency injection.
libs/services/src/SimulationService/SimulationService.ts, libs/services/src/SlippageService/SlippageService.ts, libs/services/src/SlippageService/SlippageServiceMain.spec.ts, libs/services/src/SlippageService/SlippageServiceMain.ts, libs/services/src/SlippageService/SlippageServiceMock.ts, libs/services/src/TokenHolderService/TokenHolderService.ts, libs/services/src/UsdService/UsdService.ts Standardize import of SupportedChainId from shared package.
libs/shared/src/const.ts Add EXPLORER_NETWORK_NAMES and COW_API_NETWORK_NAMES constants.
libs/shared/src/index.ts Refactor exports for granular utility modules and add shared logger export.
libs/shared/src/logger.ts Add shared logger instance using createLogger.
libs/shared/src/types.ts Add exported Logger type alias for pino.Logger.
libs/shared/src/utils/doForever.ts Add doForever utility for repeated async execution with logging and stop control.
libs/shared/src/utils/format.ts Add formatting utilities for explorer URLs, token amounts, and token names.
libs/shared/src/utils/logger.ts Rename and export logger factory as createLogger; remove singleton export.
libs/shared/src/utils/misc.ts Add utilities for bigint JSON handling, sleep, and environment variable checking.

Sequence Diagram(s)

sequenceDiagram
    participant CmsProducer as CmsNotificationProducer
    participant TradeProducer as TradeNotificationProducer
    participant IndexerRepo as IndexerStateRepository
    participant PushRepo as PushNotificationsRepository
    participant SubRepo as PushSubscriptionsRepository
    participant Queue as Notification Queue
    participant DB as Postgres DB

    Note over CmsProducer,TradeProducer: On startup (mainLoop)
    CmsProducer->>SubRepo: getAllSubscribedAccounts()
    SubRepo->>DB: (if cache expired) fetch accounts
    CmsProducer->>CmsProducer: fetch CMS notifications for accounts
    CmsProducer->>PushRepo: connect()
    CmsProducer->>Queue: sendNotifications(notifications[])
    TradeProducer->>IndexerRepo: get(last processed block)
    TradeProducer->>TradeProducer: fetch new trade/order events (block range)
    TradeProducer->>SubRepo: getAllSubscribedAccounts()
    TradeProducer->>TradeProducer: generate notifications from events
    TradeProducer->>PushRepo: connect()
    TradeProducer->>Queue: sendNotifications(notifications[])
    TradeProducer->>IndexerRepo: upsert(new state)
Loading

Possibly related PRs

  • cowprotocol/bff#122: Refactors the notification producer to a modular producer-based architecture, introduces indexer state persistence, and graceful shutdown handling—directly related at the code level.
  • cowprotocol/bff#125: Involves restructuring and renaming of push notification repositories, repository interfaces, database connection logic, and improvements to the doForever utility—direct overlap with this PR.
  • cowprotocol/bff#121: Introduces and modifies the database indexer state table and its usage in the notification producer, enhancing state persistence—directly related.

Suggested reviewers

  • alfetopito

Poem

In the warren of code, a rabbit hops with glee,
Modular producers now fetch with esprit!
Notifications queued, with state held tight,
Repositories abound, everything just right.
With logs that are shared, and tests that are new,
This bunny’s delighted—so should be you!

((\
( -.-)
o_(")(")


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 714d8a0 and a01fadb.

📒 Files selected for processing (41)
  • .env.example (1 hunks)
  • .gitignore (1 hunks)
  • README.md (2 hunks)
  • apps/api/src/app/inversify.config.ts (4 hunks)
  • apps/notification-producer/db.sql (1 hunks)
  • apps/notification-producer/src/main.ts (1 hunks)
  • apps/notification-producer/src/postToQueueTest.test.ts (1 hunks)
  • apps/notification-producer/src/producers/cms/CmsNotificationProducer.ts (1 hunks)
  • apps/notification-producer/src/producers/trade/TradeNotificationProducer.ts (1 hunks)
  • apps/notification-producer/src/producers/trade/fromOrderInvalidationToNotification.ts (1 hunks)
  • apps/notification-producer/src/producers/trade/fromTradeToNotification.ts (1 hunks)
  • apps/notification-producer/src/producers/trade/getTradeNotifications.ts (1 hunks)
  • apps/telegram/src/main.ts (5 hunks)
  • apps/twap/src/app/utils/getApiBaseUrl.ts (1 hunks)
  • libs/notifications/src/index.ts (2 hunks)
  • libs/repositories/src/IndexerStateRepository/IndexerStateRepository.ts (1 hunks)
  • libs/repositories/src/PushNotificationsRepository/PushNotificationsRepository.ts (1 hunks)
  • libs/repositories/src/PushSubscriptionsRepository/PushSubscriptionsRepository.ts (1 hunks)
  • libs/repositories/src/SimulationRepository/SimulationRepositoryTenderly.ts (5 hunks)
  • libs/repositories/src/UsdRepository/UsdRepositoryCoingecko.test.ts (1 hunks)
  • libs/repositories/src/UsdRepository/UsdRepositoryCow.spec.ts (2 hunks)
  • libs/repositories/src/UsdRepository/UsdRepositoryCow.ts (6 hunks)
  • libs/repositories/src/datasources/cowApi.ts (1 hunks)
  • libs/repositories/src/datasources/postgres.ts (1 hunks)
  • libs/repositories/src/index.ts (2 hunks)
  • libs/services/src/SimulationService/SimulationService.ts (1 hunks)
  • libs/services/src/SlippageService/SlippageService.ts (1 hunks)
  • libs/services/src/SlippageService/SlippageServiceMain.spec.ts (2 hunks)
  • libs/services/src/SlippageService/SlippageServiceMain.ts (2 hunks)
  • libs/services/src/SlippageService/SlippageServiceMock.ts (1 hunks)
  • libs/services/src/TokenHolderService/TokenHolderService.ts (1 hunks)
  • libs/services/src/UsdService/UsdService.ts (1 hunks)
  • libs/services/src/factories.ts (1 hunks)
  • libs/shared/src/const.ts (2 hunks)
  • libs/shared/src/index.ts (1 hunks)
  • libs/shared/src/logger.ts (1 hunks)
  • libs/shared/src/types.ts (2 hunks)
  • libs/shared/src/utils/doForever.ts (1 hunks)
  • libs/shared/src/utils/format.ts (1 hunks)
  • libs/shared/src/utils/logger.ts (1 hunks)
  • libs/shared/src/utils/misc.ts (2 hunks)
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@socket-security
Copy link

socket-security bot commented Apr 21, 2025

Review the following changes in direct dependencies. Learn more about Socket for GitHub.

Diff Package Supply Chain
Security
Vulnerability Quality Maintenance License
Addedpg-numeric@​1.0.21001006675100
Added@​types/​pg@​8.11.131001007188100
Addedobuf@​1.1.21001007176100
Updatedpg-protocol@​1.7.0 ⏵ 1.8.0100 +110072 +187 +7100
Updatedpg-pool@​3.7.0 ⏵ 3.8.0100 +110010090 +8100
Updatedpg@​8.13.1 ⏵ 8.14.199 +110098 +192100

View full report

@anxolin anxolin changed the title Push feat: add Push notifications for trades and cancelations Apr 21, 2025
@anxolin anxolin marked this pull request as ready for review April 21, 2025 17:27
@anxolin anxolin requested a review from a team April 21, 2025 17:27
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

🔭 Outside diff range comments (3)
apps/api/src/app/inversify.config.ts (1)

36-44: ⚠️ Potential issue

Import the Logger type to fix pipeline failure

Logger is referenced in the binding but never imported, causing TS2304.
Assuming the logger instance is a pino logger:

-import { logger } from '@cowprotocol/shared';
+import { logger } from '@cowprotocol/shared';
+import type { Logger } from 'pino';

Alternatively, re‑export Logger from @cowprotocol/shared and import it from there.

🧰 Tools
🪛 GitHub Check: docker (apps/notification-producer)

[failure] 43-43:
Cannot find name 'Logger'.

🪛 GitHub Check: docker (apps/telegram)

[failure] 43-43:
Cannot find name 'Logger'.

🪛 GitHub Check: docker (apps/twap)

[failure] 43-43:
Cannot find name 'Logger'.

🪛 GitHub Check: docker (apps/api)

[failure] 43-43:
Cannot find name 'Logger'.

🪛 GitHub Actions: Publish Docker image

[error] 43-43: TypeScript error TS2304: Cannot find name 'Logger'.

libs/cms-api/src/index.ts (1)

172-197: ⚠️ Potential issue

Pagination off‑by‑one: always re‑fetching the same page
Inside getAllPages, page is incremented after fetching, so the same page 0 is requested repeatedly – resulting in an infinite loop or duplicated data.

-  while (subscriptions.length > pageSize) {
-    subscriptions = await getPage({
-      page,
+  while (subscriptions.length > pageSize) {
+    page += 1;                        // ✅ advance page number first
+    subscriptions = await getPage({
+      page,
       pageSize: pageSize + 1,
     });
apps/telegram/src/main.ts (1)

48-53: 🛠️ Refactor suggestion

Cache timestamp is never updated – forces CMS call on every message
getSubscriptions stores the subscriptions in SUBSCRIPTION_CACHE but forgets to update LAST_SUBSCRIPTION_CHECK, so the “stale” check always fails.

-    SUBSCRIPTION_CACHE.set(account, subscriptionForAccount);
+    SUBSCRIPTION_CACHE.set(account, subscriptionForAccount);
+    LAST_SUBSCRIPTION_CHECK.set(account, new Date());
🧹 Nitpick comments (16)
apps/notification-producer/db.sql (2)

1-8: Define a primary key and rename reserved column.

  • It's recommended to explicitly define a primary key rather than relying solely on a unique constraint.
  • The column name key is a SQL reserved word; consider renaming it (e.g., state_key) to avoid quoting issues.
  • Adopt snake_case for chain_id for consistency with common conventions.
- CREATE TABLE notifications_indexer_state (
-   key TEXT NOT NULL,
-   chainId INTEGER,
+ CREATE TABLE notifications_indexer_state (
+   state_key TEXT NOT NULL,
+   chain_id INTEGER NOT NULL,

11-17: Use table-specific trigger function naming.
The function set_updated_at is generically named and could collide with other triggers. Rename it to something like notifications_indexer_state_set_updated_at for clarity and to avoid namespace conflicts.

apps/notification-producer/src/utils.ts (1)

3-28: Well-implemented utility for long-running tasks

The doForever function provides a robust pattern for running asynchronous operations indefinitely with proper error handling, logging, and a controlled shutdown mechanism.

Consider implementing an exponential backoff strategy for error recovery instead of using a fixed wait time. This would be more resilient in environments with intermittent connection issues:

- console.log(
-   `[${name}] Reconnecting in ${waitTimeMilliseconds / 1000}s...`
- );
+ let retryDelay = waitTimeMilliseconds;
+ console.log(
+   `[${name}] Reconnecting in ${retryDelay / 1000}s...`
+ );

Then in the loop:

export async function doForever(
  name: string,
  callback: (stop: () => void) => Promise<void>,
  waitTimeMilliseconds: number
) {
  // eslint-disable-next-line no-constant-condition
  let running = true;
+ let errorCount = 0;
+ const maxBackoff = 60000; // 1 minute maximum backoff
  while (running) {
    const stop = () => {
      console.log(`[${name}] Stopping...`);
      running = false;
    };

    try {
      await callback(stop);
+     // Reset error count on successful execution
+     errorCount = 0;
    } catch (error) {
      console.error(`[${name}] Error `, error);
+     // Calculate exponential backoff with jitter
+     const backoff = Math.min(
+       waitTimeMilliseconds * Math.pow(2, errorCount),
+       maxBackoff
+     );
+     const jitter = Math.random() * 0.3 + 0.85; // 0.85-1.15 range
+     const delay = Math.floor(backoff * jitter);
+     errorCount++;
      console.log(
-       `[${name}] Reconnecting in ${waitTimeMilliseconds / 1000}s...`
+       `[${name}] Reconnecting in ${delay / 1000}s...`
      );
+     await sleep(delay);
+     continue;
    } finally {
-     await sleep(waitTimeMilliseconds);
+     if (errorCount === 0) {
+       await sleep(waitTimeMilliseconds);
+     }
    }
  }
  console.log(`[${name}] Stopped`);
}
libs/shared/src/utils.ts (2)

13-32: Well-structured logging configuration

The getLogger function provides good flexibility with environment-based configuration options. The defaults are sensible - pretty printing for development and JSON for production.

Consider validating the LOG_LEVEL environment variable to ensure it's a valid log level:

+ const validLogLevels = ['trace', 'debug', 'info', 'warn', 'error', 'fatal'];
+ const logLevel = process.env.LOG_LEVEL ?? 'info';
+ 
+ if (process.env.LOG_LEVEL && !validLogLevels.includes(process.env.LOG_LEVEL)) {
+   console.warn(`Invalid LOG_LEVEL "${process.env.LOG_LEVEL}", defaulting to "info"`);
+ }

  return pino({
    ...loggerConfigEnv,
-   level: process.env.LOG_LEVEL ?? 'info',
+   level: validLogLevels.includes(logLevel) ? logLevel : 'info',
  });

77-82: Strengthen the regex pattern for bigint deserialization

The current regex pattern might match unintended strings.

A more precise regex pattern would ensure only valid bigint strings are matched:

- if (typeof value === 'string' && /^\d+n$/.test(value)) {
+ if (typeof value === 'string' && /^[0-9]+n$/.test(value)) {
apps/notification-producer/src/repositories/NotificationsRepository.ts (1)

55-62: Consider cheaper health‑checks

Creating a channel on every pingConnection() call can be heavy under high load.
If the AMQP library exposes connection.closed or similar, prefer that, or throttle pings with a timestamp cache.
(Not blocking, just a perf note.)

libs/notifications/src/index.ts (2)

23-31: Harden JSON (de)serialisation with validation & error handling

Blind JSON.parse or JSON.stringify can crash the whole consumer/producer on malformed input or on values that
JSON.stringify cannot encode (e.g. bigint). Consider:

-export function parseNotifications(
-  notificationsString: string
-): Notification[] {
-  return JSON.parse(notificationsString);
+export function parseNotifications(notificationsString: string): Notification[] {
+  try {
+    const data = JSON.parse(notificationsString);
+    assert(Array.isArray(data), 'Expected an array of notifications');
+    return data;
+  } catch (err) {
+    console.error('[notifications] Failed to parse payload', {
+      payload: notificationsString,
+      err,
+    });
+    throw err;
+  }
 }
 
-export function stringifyNotifications(notifications: Notification[]): string {
-  return JSON.stringify(notifications);
+export function stringifyNotifications(notifications: Notification[]): string {
+  return JSON.stringify(notifications /*, bigIntReplacer */);
 }

(The optional bigIntReplacer guards against future schema changes that introduce bigint fields.)


78-81: Back‑pressure from channel.sendToQueue is ignored

Channel#sendToQueue returns false when the write buffer is full.
If the producer floods the queue, messages will start getting dropped locally.

At minimum:

-  channel.sendToQueue(queue, Buffer.from(message));
+  const ok = channel.sendToQueue(queue, Buffer.from(message));
+  if (!ok) {
+    console.warn('[notifications] RabbitMQ write buffer is full ‑ pausing...');
+  }

or switch to a confirm channel (createConfirmChannel) for delivery guarantees.

apps/notification-producer/src/producers/TradeNotificationProducer.ts (2)

332-336: Misleading title for order‑invalidated notifications

The user sees a “Trade” title for a cancellation event.

-    title: 'Trade',
+    title: 'Order invalidated',

119-127: Large owner filter array may overflow RPC limits

Passing hundreds/thousands of addresses to viem.getLogs could exceed node
limits and yield invalid argument errors. Consider batching or filtering
client‑side instead.

apps/notification-producer/src/main.ts (1)

1-6: Duplicate reflect-metadata import – remove the second one
The file imports reflect-metadata twice (lines 1 and 5). The second import is redundant and can be safely deleted.

-import 'reflect-metadata';   // 👈 line 5 – remove
libs/cms-api/src/index.ts (1)

143-161: Minor duplication – getAllNotifications could reuse getAllPages helper
Now that a generic paginator exists, getAllNotifications duplicates pagination logic. Refactoring it to call getAllPages would reduce code size and the chance of divergent behaviour.

Nice‑to‑have; can be addressed separately.

libs/services/src/factories.ts (4)

28-31: Constants for cache durations provide good configurability

The use of the ms library for time constants enhances readability. However, consider extracting these constants to a configuration file to make them more easily adjustable without modifying the code.

-const DEFAULT_CACHE_VALUE_SECONDS = ms('2min') / 1000; // 2min cache time by default for values
-const DEFAULT_CACHE_NULL_SECONDS = ms('30min') / 1000; // 30min cache time by default for NULL values (when the repository isn't known)
-
-const CACHE_TOKEN_INFO_SECONDS = ms('24h') / 1000; // 24h
+// Import from a configuration file or use environment variables
+const DEFAULT_CACHE_VALUE_SECONDS = Number(process.env.DEFAULT_CACHE_VALUE_SECONDS) || ms('2min') / 1000;
+const DEFAULT_CACHE_NULL_SECONDS = Number(process.env.DEFAULT_CACHE_NULL_SECONDS) || ms('30min') / 1000;
+const CACHE_TOKEN_INFO_SECONDS = Number(process.env.CACHE_TOKEN_INFO_SECONDS) || ms('24h') / 1000;

33-42: ERC20 repository implementation looks solid

The implementation correctly wraps the Viem-based repository with a cache layer. Consider adding JSDoc comments to clarify the function's purpose and parameter requirements.

+/**
+ * Creates an ERC20 repository with caching layer
+ * @param cacheRepository - Repository used for caching ERC20 token data
+ * @returns Cached ERC20 repository instance
+ */
 export function getErc20Repository(
   cacheRepository: CacheRepository
 ): Erc20Repository {
   return new Erc20RepositoryCache(
     new Erc20RepositoryViem(viemClients),
     cacheRepository,
     'erc20',
     CACHE_TOKEN_INFO_SECONDS
   );
 }

69-79: USD repository from Coingecko properly configured

Similar to the Cow repository, this is well-structured. However, there's an inconsistency: the Cow repository gets a logger but this one doesn't.

 export function getUsdRepositoryCoingecko(
   cacheRepository: CacheRepository
 ): UsdRepository {
   return new UsdRepositoryCache(
-    new UsdRepositoryCoingecko(),
+    new UsdRepositoryCoingecko(logger.child({ module: 'usd-coingecko' })),
     cacheRepository,
     'usdCoingecko',
     DEFAULT_CACHE_VALUE_SECONDS,
     DEFAULT_CACHE_NULL_SECONDS
   );
 }

The implementation depends on whether UsdRepositoryCoingecko accepts a logger parameter. If it doesn't, consider modifying that class to accept a logger for consistency.


91-113: Token holder repositories follow consistent pattern

Both token holder repositories follow the same pattern as the USD repositories, which is good for consistency. However, like the Coingecko repository, they seem to be missing loggers.

If the underlying repository implementations (TokenHolderRepositoryEthplorer and TokenHolderRepositoryMoralis) accept loggers, consider adding them for consistent logging across all repositories.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3a50c15 and 714d8a0.

⛔ Files ignored due to path filters (1)
  • yarn.lock is excluded by !**/yarn.lock, !**/*.lock
📒 Files selected for processing (27)
  • README.md (1 hunks)
  • apps/api/src/app/inversify.config.ts (4 hunks)
  • apps/api/src/logger.ts (0 hunks)
  • apps/api/src/main.ts (1 hunks)
  • apps/api/tsconfig.app.json (1 hunks)
  • apps/notification-producer/db.sql (1 hunks)
  • apps/notification-producer/src/main.ts (1 hunks)
  • apps/notification-producer/src/postToQueueTest.test.ts (1 hunks)
  • apps/notification-producer/src/postToQueueTest.ts (0 hunks)
  • apps/notification-producer/src/producers/CmsNotificationProducer.ts (1 hunks)
  • apps/notification-producer/src/producers/TradeNotificationProducer.ts (1 hunks)
  • apps/notification-producer/src/repositories/NotificationsIndexerStateRepository.ts (1 hunks)
  • apps/notification-producer/src/repositories/NotificationsRepository.ts (1 hunks)
  • apps/notification-producer/src/repositories/SubscriptionsRepository.ts (1 hunks)
  • apps/notification-producer/src/utils.ts (1 hunks)
  • apps/notification-producer/types.ts (1 hunks)
  • apps/telegram/src/main.ts (5 hunks)
  • libs/cms-api/src/index.ts (4 hunks)
  • libs/notifications/src/index.ts (2 hunks)
  • libs/repositories/src/datasources/redis.ts (1 hunks)
  • libs/services/src/factories.ts (1 hunks)
  • libs/services/src/index.ts (1 hunks)
  • libs/shared/src/index.ts (1 hunks)
  • libs/shared/src/logger.ts (1 hunks)
  • libs/shared/src/utils.ts (2 hunks)
  • package.json (2 hunks)
  • tsconfig.base.json (1 hunks)
💤 Files with no reviewable changes (2)
  • apps/notification-producer/src/postToQueueTest.ts
  • apps/api/src/logger.ts
🧰 Additional context used
🧬 Code Graph Analysis (9)
libs/shared/src/logger.ts (1)
libs/shared/src/utils.ts (1)
  • getLogger (13-32)
apps/notification-producer/src/postToQueueTest.test.ts (1)
libs/notifications/src/index.ts (3)
  • connectToChannel (47-70)
  • NOTIFICATIONS_QUEUE (4-4)
  • sendNotificationsToQueue (78-82)
apps/notification-producer/src/utils.ts (3)
apps/notification-producer/src/producers/TradeNotificationProducer.ts (1)
  • stop (79-84)
apps/notification-producer/src/producers/CmsNotificationProducer.ts (1)
  • stop (54-57)
libs/notifications/src/index.ts (1)
  • sleep (34-36)
apps/notification-producer/src/repositories/NotificationsRepository.ts (1)
libs/notifications/src/index.ts (5)
  • ConnectToChannelResponse (42-45)
  • NOTIFICATIONS_QUEUE (4-4)
  • connectToChannel (47-70)
  • Notification (15-21)
  • sendNotificationsToQueue (78-82)
apps/notification-producer/src/producers/TradeNotificationProducer.ts (10)
apps/notification-producer/src/repositories/NotificationsRepository.ts (1)
  • NotificationsRepository (10-66)
apps/notification-producer/src/repositories/SubscriptionsRepository.ts (1)
  • SubscriptionRepository (6-23)
apps/notification-producer/src/repositories/NotificationsIndexerStateRepository.ts (1)
  • NotificationsIndexerStateRepository (12-48)
libs/repositories/src/Erc20Repository/Erc20Repository.ts (2)
  • Erc20Repository (12-19)
  • Erc20 (5-10)
apps/notification-producer/types.ts (1)
  • Runnable (4-14)
libs/notifications/src/index.ts (1)
  • Notification (15-21)
apps/notification-producer/src/utils.ts (1)
  • doForever (3-28)
apps/notification-producer/src/producers/CmsNotificationProducer.ts (1)
  • stop (54-57)
libs/repositories/src/datasources/viem.ts (1)
  • viemClients (13-31)
libs/shared/src/utils.ts (1)
  • bigIntReplacer (70-75)
apps/notification-producer/src/repositories/SubscriptionsRepository.ts (1)
libs/cms-api/src/index.ts (1)
  • getAllSubscribedAccounts (143-148)
apps/api/src/app/inversify.config.ts (1)
libs/services/src/factories.ts (1)
  • getUsdRepository (81-89)
libs/services/src/factories.ts (9)
libs/repositories/src/CacheRepository/CacheRepository.ts (1)
  • CacheRepository (3-7)
libs/repositories/src/Erc20Repository/Erc20Repository.ts (1)
  • Erc20Repository (12-19)
libs/repositories/src/datasources/viem.ts (1)
  • viemClients (13-31)
libs/repositories/src/datasources/redis.ts (1)
  • redisClient (9-17)
libs/repositories/src/UsdRepository/UsdRepository.ts (1)
  • UsdRepository (24-35)
libs/repositories/src/datasources/cowApi.ts (1)
  • cowApiClients (18-30)
libs/shared/src/logger.ts (1)
  • logger (3-3)
libs/repositories/src/TokenHolderRepository/TokenHolderRepository.ts (1)
  • TokenHolderRepository (11-16)
libs/repositories/src/SimulationRepository/SimulationRepository.ts (1)
  • SimulationRepository (24-29)
apps/telegram/src/main.ts (2)
libs/cms-api/src/index.ts (1)
  • CmsTelegramSubscription (6-9)
libs/notifications/src/index.ts (2)
  • Notification (15-21)
  • parseNotifications (23-27)
🪛 Biome (1.9.4)
apps/notification-producer/src/main.ts

[error] 95-95: void is confusing outside a return type or a type parameter.

Unsafe fix: Use undefined instead.

(lint/suspicious/noConfusingVoidType)

🪛 GitHub Actions: Publish Docker image
apps/api/src/app/inversify.config.ts

[error] 43-43: TypeScript error TS2304: Cannot find name 'Logger'.

🔇 Additional comments (22)
package.json (2)

58-58: Update pg dependency to a newer patch version
Upgrading pg from ^8.13.1 to ^8.14.1 aligns with the new PostgreSQL-backed repositories in the notification producer and pulls in any patch fixes.


82-82: Add TypeScript typings for pg
Introducing @types/pg ensures proper TypeScript support and compile-time checking for our PostgreSQL client usage.

tsconfig.base.json (1)

11-11: Review ES target upgrade for runtime compatibility.
Confirm that the deployment environments (Node versions and bundlers) fully support ES2020 output to avoid runtime issues. For example, ensure your CI and production Node versions are ≥14.

apps/api/src/main.ts (1)

3-3: Verify shared logger compatibility with Fastify.
Ensure that the logger exported by @cowprotocol/shared satisfies Fastify's expected FastifyLoggerInstance interface (e.g., Pino-compatible). A mismatch may cause runtime errors when initializing the server.

libs/services/src/index.ts (1)

10-10: Confirm presence and build inclusion of factories module.
Make sure that libs/services/src/factories/index.ts exists and correctly exports the repository factory functions. Missing or misnamed files can lead to TypeScript or bundler errors.

apps/api/tsconfig.app.json (1)

6-6: Validate types and include settings in tsconfig.app.json.

  • By specifying "types": ["node"], you may be restricting other ambient type definitions—confirm that no additional globals are needed.
  • Including only "../../libs/shared/src/logger.ts" could omit other shared package files if they’re imported; ensure all required shared sources (e.g., from @cowprotocol/services) are compiled.

Also applies to: 9-9

libs/shared/src/index.ts (1)

5-5: Centralized logger export added

Good addition. This export makes the centralized logger accessible to all modules importing from the shared library, supporting the architecture refactor for unified logging.

libs/shared/src/logger.ts (1)

1-3: Clean implementation of centralized logger singleton

This implementation correctly creates a singleton logger instance using the getLogger utility. This centralization will help maintain consistent logging configuration across all services.

README.md (1)

36-37: Documentation updated with database container step

Good addition to the setup instructions. This correctly documents the need to start the PostgreSQL database container before the notification producer, aligning with the architectural changes that now require database persistence for notification state tracking.

apps/notification-producer/types.ts (1)

1-14: Well-defined Runnable interface

The Runnable interface is well-documented and provides a clean contract for implementing lifecycle management in the new producer classes. This interface is central to the architectural refactor from monolithic to modular notification producers.

libs/repositories/src/datasources/redis.ts (2)

3-7: Improved Redis configuration logic

The updated Redis enablement logic now properly prioritizes the explicit REDIS_ENABLED environment variable over the presence of REDIS_HOST. This makes the configuration more flexible and explicit, allowing Redis to be explicitly disabled even when a host is configured.


9-9: Updated Redis client initialization to use the new flag

The client initialization now correctly references the updated isRedisEnabled flag, maintaining consistency with the new configuration logic.

apps/notification-producer/src/repositories/SubscriptionsRepository.ts (1)

25-27: Efficient normalization helper function

The uniqueLowercase function elegantly handles both lowercase conversion and deduplication of account addresses using Set and Array.from.

libs/shared/src/utils.ts (2)

11-11: Good type aliasing practice

Exporting the Logger type alias provides a clean abstraction over the underlying library type, making it easier to switch logging implementations in the future.


70-75: Good JSON serialization helper for bigint

The bigIntReplacer function correctly handles the serialization of JavaScript's native bigint type, which isn't natively supported by JSON.stringify.

apps/notification-producer/src/repositories/NotificationsIndexerStateRepository.ts (1)

38-46: ON CONFLICT may generate duplicates when chainId is NULL

PostgreSQL treats NULL values as distinct, so ON CONFLICT (key, chainId) will not match rows where chainId is NULL.
If you intend (key, NULL) to be unique, use a partial unique index (WHERE chainId IS NULL) or coalesce chainId to a sentinel value (e.g., 0) before inserting.

apps/api/src/app/inversify.config.ts (1)

53-58: Symbol mismatch likely causes wrong DI binding

You bind a SimulationRepository to tenderlyRepositorySymbol, yet the service layer expects simulationRepositorySymbol.
Double‑check the consumer side; mismatched symbols break dependency injection silently.

-  .bind<SimulationRepository>(tenderlyRepositorySymbol)
+  .bind<SimulationRepository>(simulationRepositorySymbol)
apps/notification-producer/src/producers/TradeNotificationProducer.ts (1)

107-110: Potential gap on first run – verify block range logic

Starting from lastBlock when there is no stored state skips every event
that happened before the latest block. Is this deliberate?

If you intend to replay a larger history on first start, initialise
fromBlock with 0n or an environment‑configurable value.

libs/services/src/factories.ts (4)

1-27: Good organization of imports and dependencies

The organization of imports is clean, with a clear separation between external libraries and internal modules. The module imports all necessary repository types and implementations, which aligns well with its factory pattern purpose.


52-67: USD repository from Cow API properly configured

The repository is correctly created with appropriate dependencies. Good use of child logger with context. Consider making cache duration configurable per repository instance.


81-89: Fallback USD repository provides good resilience

Creating a fallback repository that tries multiple data sources is an excellent approach for resilience. The order (Coingecko first, then Cow) is sensible if Coingecko has higher reliability or performance.


115-122: Fallback token holder repository follows good pattern

Similar to the USD repository fallback, this provides resilience by trying multiple data sources. The code structure is consistent with the rest of the file.


const CACHE_TIME = 30000;

// TODO: Move to repositories and make a proper cached repository and use DI
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

TODO should be addressed

The comment indicates this code needs further refactoring to use proper dependency injection. This should be prioritized in a future PR for improved testability and maintainability.

Comment on lines 6 to 23
export class SubscriptionRepository {
private lastCheck: number | null = null;
private cachedAccounts: string[] | null = null;

async getAllSubscribedAccounts(): Promise<string[]> {
const now = Date.now();
if (
!this.cachedAccounts ||
!this.lastCheck ||
now - this.lastCheck > CACHE_TIME
) {
this.cachedAccounts = uniqueLowercase(await getAllSubscribedAccounts());
this.lastCheck = now;
return this.cachedAccounts;
}
return this.cachedAccounts || [];
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling in the repository

The repository lacks error handling for API calls and doesn't provide any logging for debugging issues.

Improve the implementation with proper error handling and logging:

import { getAllSubscribedAccounts } from '@cowprotocol/cms-api';
+ import { Logger, getLogger } from '@cowprotocol/shared';

const CACHE_TIME = 30000;

// TODO: Move to repositories and make a proper cached repository and use DI
export class SubscriptionRepository {
  private lastCheck: number | null = null;
  private cachedAccounts: string[] | null = null;
+ private logger: Logger;
+ 
+ constructor() {
+   this.logger = getLogger().child({ context: 'SubscriptionRepository' });
+ }

  async getAllSubscribedAccounts(): Promise<string[]> {
    const now = Date.now();
    if (
      !this.cachedAccounts ||
      !this.lastCheck ||
      now - this.lastCheck > CACHE_TIME
    ) {
+     try {
+       this.logger.debug('Cache expired or empty, fetching subscribed accounts');
        this.cachedAccounts = uniqueLowercase(await getAllSubscribedAccounts());
        this.lastCheck = now;
+       this.logger.debug({ count: this.cachedAccounts.length }, 'Successfully cached accounts');
        return this.cachedAccounts;
+     } catch (error) {
+       this.logger.error({ error }, 'Failed to fetch subscribed accounts');
+       // Return existing cache if available, otherwise empty array
+       return this.cachedAccounts || [];
+     }
    }
+   this.logger.debug({ count: this.cachedAccounts.length }, 'Using cached accounts');
    return this.cachedAccounts || [];
  }
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export class SubscriptionRepository {
private lastCheck: number | null = null;
private cachedAccounts: string[] | null = null;
async getAllSubscribedAccounts(): Promise<string[]> {
const now = Date.now();
if (
!this.cachedAccounts ||
!this.lastCheck ||
now - this.lastCheck > CACHE_TIME
) {
this.cachedAccounts = uniqueLowercase(await getAllSubscribedAccounts());
this.lastCheck = now;
return this.cachedAccounts;
}
return this.cachedAccounts || [];
}
}
import { getAllSubscribedAccounts } from '@cowprotocol/cms-api';
import { Logger, getLogger } from '@cowprotocol/shared';
const CACHE_TIME = 30000;
// TODO: Move to repositories and make a proper cached repository and use DI
export class SubscriptionRepository {
private lastCheck: number | null = null;
private cachedAccounts: string[] | null = null;
private logger: Logger;
constructor() {
this.logger = getLogger().child({ context: 'SubscriptionRepository' });
}
async getAllSubscribedAccounts(): Promise<string[]> {
const now = Date.now();
if (
!this.cachedAccounts ||
!this.lastCheck ||
now - this.lastCheck > CACHE_TIME
) {
try {
this.logger.debug('Cache expired or empty, fetching subscribed accounts');
this.cachedAccounts = uniqueLowercase(await getAllSubscribedAccounts());
this.lastCheck = now;
this.logger.debug({ count: this.cachedAccounts.length }, 'Successfully cached accounts');
return this.cachedAccounts;
} catch (error) {
this.logger.error({ error }, 'Failed to fetch subscribed accounts');
// Return existing cache if available, otherwise empty array
return this.cachedAccounts || [];
}
}
this.logger.debug({ count: this.cachedAccounts.length }, 'Using cached accounts');
return this.cachedAccounts || [];
}
}

Comment on lines 21 to 23
// Watch for connection close
this.connection.connection.on('close', () => this.close());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid un‑handled promise in the 'close' event listener

this.close() is async, but the handler does not await or handle its rejection, which can lead to an unhandled‑rejection warning.

-      this.connection.connection.on('close', () => this.close());
+      this.connection.connection.on('close', () => {
+        // Fire‑and‑forget but surface errors for visibility
+        void this.close().catch((err) =>
+          console.error('[NotificationsRepository] error while closing', err),
+        );
+      });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Watch for connection close
this.connection.connection.on('close', () => this.close());
// Watch for connection close
this.connection.connection.on('close', () => {
// Fire‑and‑forget but surface errors for visibility
void this.close().catch((err) =>
console.error('[NotificationsRepository] error while closing', err),
);
});

Comment on lines 66 to 76
// Run all producers in the background
const promises = producers.map((producer) =>
producer
.start()
.then(() => {
if (!shuttingDown) producer.start();
})
.catch(() => {
if (!shuttingDown) producer.start();
})
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Restart loop loses promise references → early termination risk
producer.start() is re‑invoked inside the .then/.catch handlers, but the new promise is not added to promises. producersPromise therefore resolves when the first cycle ends and the main loop exits while producers keep running in the background.

-const promises = producers.map((producer) =>
-  producer
-    .start()
-    .then(() => {
-      if (!shuttingDown) producer.start();
-    })
-    .catch(() => {
-      if (!shuttingDown) producer.start();
-    })
-);
+const promises = producers.map(async (producer) => {
+  // keep restarting until shutdown flag is set
+  while (!shuttingDown) {
+    try {
+      await producer.start();
+    } catch (err) {
+      console.error(`[notification-producer] ${producer.constructor.name} crashed – restarting`, err);
+    }
+  }
+});

This way the promise in the array only resolves when shuttingDown becomes true.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Run all producers in the background
const promises = producers.map((producer) =>
producer
.start()
.then(() => {
if (!shuttingDown) producer.start();
})
.catch(() => {
if (!shuttingDown) producer.start();
})
);
// Run all producers in the background
const promises = producers.map(async (producer) => {
// keep restarting until shutdown flag is set
while (!shuttingDown) {
try {
await producer.start();
} catch (err) {
console.error(
`[notification-producer] ${producer.constructor.name} crashed – restarting`,
err
);
}
}
});

Copy link
Contributor

@alfetopito alfetopito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build is failing.
And there are several good comments from rabbitai that should be handled.

return new SimulationRepositoryTenderly(logger.child({ module: 'tenderly' }));
}
import { Container } from 'inversify';
import { logger } from '@cowprotocol/shared';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing the type import?

image

* feat: misc improvements and cleanup after hackathon

* feat: make the stop function to wake any sleeping runner

* fix: fix compile errors

* feat: reuse the same pool

* chore: ignore MCP config

* fix: fix test and do not inject the logger

* feat: make it easier to test notifications

* feat: handle empty array

* chore: improve format message and log

* fix: fix test by removing logger

* fix: fix tests

* chore: refactor

* fix: fix compile errors

* fix: compile errors

* chore: improve error logging

* chore: avoid name collision

* chore: implement repository

* chore: remove duplicate

* feat: make sure envs are defined when we ask for a new postgress pool

* chore: move enums to utils

* chore: move to consts

* fix: fix no awaited ping

* fix: fix issue with re-try logic

* chore: misc improvements

* fix: make sure we delete from the queue

* chore: ODC nitpick

Co-authored-by: Leandro <alfetopito@users.noreply.github.com>

---------

Co-authored-by: Leandro <alfetopito@users.noreply.github.com>
@anxolin anxolin merged commit 436bf25 into main Apr 22, 2025
6 of 7 checks passed
@anxolin anxolin deleted the push branch April 22, 2025 15:42
Copy link
Contributor

@alfetopito alfetopito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One potential bug to fix

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants