From 8c62f57a8168027b6e621393b315dd530c05fc15 Mon Sep 17 00:00:00 2001 From: prajwal Date: Thu, 7 Aug 2025 16:54:45 +0530 Subject: [PATCH 1/5] migration-script to push user data to kafka --- src/migrations/pushDataToKafka/README.md | 93 ++++++ .../pushDataToKafka/pushUserDataToKafka.js | 312 ++++++++++++++++++ src/package.json | 1 + 3 files changed, 406 insertions(+) create mode 100644 src/migrations/pushDataToKafka/README.md create mode 100644 src/migrations/pushDataToKafka/pushUserDataToKafka.js diff --git a/src/migrations/pushDataToKafka/README.md b/src/migrations/pushDataToKafka/README.md new file mode 100644 index 000000000..e49d4c6a2 --- /dev/null +++ b/src/migrations/pushDataToKafka/README.md @@ -0,0 +1,93 @@ +# πŸ“€ Push User Data to Kafka + +This Node.js script pushes user data (along with organization, roles, and enriched location metadata) to a Kafka topic (`userEvents`) based on users updated within a specified date range. + +--- + +## 🧾 Table of Contents + +- [πŸ“¦ Prerequisites](#-prerequisites) +- [βš™οΈ Configuration](#️-configuration) +- [πŸš€ Usage](#-usage) +- [πŸ“„ What the Script Does](#-what-the-script-does) +- [πŸ“Œ Notes](#-notes) +- [πŸ“ File Structure](#-file-structure) + +--- + +## πŸ“¦ Prerequisites + +- Node.js installed +- PostgreSQL database with user, organization, and role data +- Kafka running locally or remotely +- `.env` file configured (see [βš™οΈ Configuration](#️-configuration)) + +--- + +## βš™οΈ Configuration + +The script expects a `.env` file at the **project root (`../../.env`)** with the following variables: + +```env +DEV_DATABASE_URL=postgres://user:password@localhost:5432/database +ENTITY_MANAGEMENT_SERVICE_BASE_URL=http://localhost:5000 +INTERNAL_ACCESS_TOKEN=your_internal_token +``` + +--- + +## πŸš€ Usage + +Run the script using: + +```bash +node pushUserDataToKafka.js --from="YYYY-MM-DD" --to="YYYY-MM-DD" +``` + +- `--from`: Start date (inclusive) +- `--to`: End date (inclusive) + +πŸ“Œ **Example:** + +```bash +node pushUserDataToKafka.js --from="2025-07-01" --to="2025-07-30" +``` + +--- + +## πŸ“„ What the Script Does + +1. **Connects to PostgreSQL** using the provided `DEV_DATABASE_URL`. +2. **Fetches all users** updated between `from` and `to` dates. +3. For each user: + - Fetches their associated organizations and roles. + - Builds a Kafka event object. + - Enriches location metadata (`state`, `district`, `block`, etc.) by querying an external service. + - Pushes the event to the Kafka topic: `userEvents`. + +βœ… **Location enrichment handles failures gracefully**: + +- Missing/failing entities return `{}` or `[{}]` instead of `null`. + +--- + +## πŸ“Œ Notes + +- The script assumes Kafka is configured via the file at `../../configs/kafka/index.js`. +- Events are published using `eventBroadcasterKafka` helper. +- If an Axios request fails (e.g., service down), it logs an error and continues with the next user. +- Deleted users are detected via `deleted_at IS NOT NULL` and tagged as `"eventType": "delete"`. + +--- + +## πŸ“ File Structure + +``` +. +β”œβ”€β”€ pushUserDataToKafka.js +β”œβ”€β”€ ../../.env +β”œβ”€β”€ ../../configs/kafka/ +β”‚ └── index.js +β”œβ”€β”€ ../../helpers/eventBroadcasterMain.js +└── ... +``` diff --git a/src/migrations/pushDataToKafka/pushUserDataToKafka.js b/src/migrations/pushDataToKafka/pushUserDataToKafka.js new file mode 100644 index 000000000..a5b67ebdd --- /dev/null +++ b/src/migrations/pushDataToKafka/pushUserDataToKafka.js @@ -0,0 +1,312 @@ +require('module-alias/register') +require('dotenv').config({ path: '../../.env' }) + +// Initialize Kafka client +require('../../configs/kafka')() + +// External dependencies +const minimist = require('minimist') +const axios = require('axios') + +// Internal helper for Kafka broadcasting +const { eventBroadcasterKafka } = require('../../helpers/eventBroadcasterMain') +const { Client } = require('pg') + +/** + * Parses a PostgreSQL connection URL into a config object + */ +function parseDbUrl(url) { + const { URL } = require('url') + const dbUrl = new URL(url) + return { + user: dbUrl.username, + password: dbUrl.password, + host: dbUrl.hostname, + port: dbUrl.port, + database: dbUrl.pathname.slice(1), + ssl: dbUrl.searchParams.get('sslmode') === 'require', + } +} + +/** + * Fetches user details, along with associated organization and role information + */ +async function fetchUserOrgRoles(client, userId) { + const query = ` + SELECT + u.id AS user_id, + u.name AS user_name, + u.username, + u.email, + u.phone, + u.tenant_code AS user_tenant_code, + u.status AS user_status, + u.meta AS user_meta, + u.created_at AS user_created_at, + u.updated_at AS user_updated_at, + u.deleted_at AS user_deleted_at, + u.deleted_at IS NOT NULL AS deleted, + + org.id AS org_id, + org.name AS org_name, + org.code AS org_code, + org.description AS org_description, + org.status AS org_status, + org.related_orgs, + org.tenant_code AS org_tenant_code, + org.meta, + org.created_by AS org_created_by, + org.updated_by AS org_updated_by, + + r.id AS role_id, + r.title AS role_title, + r.label AS role_label, + r.user_type, + r.status AS role_status, + r.organization_id AS role_org_id, + r.visibility, + r.tenant_code AS role_tenant_code, + r.translations + FROM + users u + JOIN user_organizations uo + ON u.id = uo.user_id AND u.tenant_code = uo.tenant_code + JOIN organizations org + ON org.code = uo.organization_code AND org.tenant_code = uo.tenant_code + JOIN user_organization_roles uor + ON u.id = uor.user_id AND uor.organization_code = org.code AND uor.tenant_code = u.tenant_code + JOIN user_roles r + ON r.id = uor.role_id AND r.tenant_code = u.tenant_code + WHERE u.id = $1 + ` + const result = await client.query(query, [userId]) + return result.rows +} + +/** + * Builds a Kafka-compatible event object from user-org-role data + */ +function buildKafkaEvent(userRows) { + if (userRows.length === 0) return null + + const first = userRows[0] + const orgMap = new Map() + + // Group roles under their respective organizations + for (const row of userRows) { + if (!orgMap.has(row.org_id)) { + orgMap.set(row.org_id, { + id: row.org_id, + name: row.org_name, + code: row.org_code, + description: row.org_description, + status: row.org_status, + related_orgs: row.related_orgs, + tenant_code: row.org_tenant_code, + meta: row.meta, + created_by: row.org_created_by, + updated_by: row.org_updated_by, + roles: [], + }) + } + + orgMap.get(row.org_id).roles.push({ + id: row.role_id, + title: row.role_title, + label: row.role_label, + user_type: row.user_type, + status: row.role_status, + organization_id: row.role_org_id, + visibility: row.visibility, + tenant_code: row.role_tenant_code, + translations: row.translations, + }) + } + + // Determine event type based on timestamps and soft deletion + let eventType = 'update' + if (first.user_deleted_at) { + eventType = 'delete' + } else if (first.user_created_at && first.user_created_at.getTime() === first.user_updated_at.getTime()) { + eventType = 'create' + } + + return { + entity: 'user', + eventType: eventType, + entityId: first.user_id, + changes: {}, + created_by: first.user_id, + name: first.user_name, + username: first.username, + email: first.email, + phone: first.phone, + organizations: Array.from(orgMap.values()), + tenant_code: first.user_tenant_code, + status: first.user_status, + deleted: first.deleted, + id: first.user_id, + meta: first.user_meta, + created_at: first.user_created_at, + updated_at: first.user_updated_at, + deleted_at: first.user_deleted_at, + } +} + +/** + * Fetches entity metadata from external service using ID and tenantCode + */ +async function fetchEntityDetail(id, tenantCode) { + try { + if (id && Array.isArray(id) && id.length > 0) { + id = { + $in: id, + } + } + const projection = ['_id', 'metaInformation'] + let requestJSON = { + query: { _id: id, tenantId: tenantCode }, + projection: projection, + } + const options = { + headers: { + 'content-type': 'application/json', + 'internal-access-token': process.env.INTERNAL_ACCESS_TOKEN, + }, + } + const response = await axios.post( + `${process.env.ENTITY_MANAGEMENT_SERVICE_BASE_URL}/v1/entities/find`, + requestJSON, + options + ) + const entity = response.data?.result?.[0] + if (!entity) throw new Error(`Entity not found for id: ${id}`) + + return { + id: entity._id, + name: entity.metaInformation.name, + externalId: entity.metaInformation.externalId, + } + } catch (error) { + console.error('Axios Error:', error.response?.data?.message || error.message) + } +} + +/** + * Replaces location-related metadata in the event with enriched details + */ +async function enrichLocationFields(event) { + const meta = event.meta + if (!meta) return event + + const locationKeys = [ + 'state', + 'district', + 'block', + 'cluster', + 'school', + 'professional_role', + 'professional_subroles', + ] + const tenantCode = event.tenant_code + for (const key of locationKeys) { + const value = meta[key] + + if (!value) continue + + try { + if (Array.isArray(value)) { + event[key] = await Promise.all(value.map((id) => fetchEntityDetail(id, tenantCode))) + } else { + event[key] = await fetchEntityDetail(value, tenantCode) + } + } catch (err) { + console.warn(`Failed to fetch ${key} details:`, err.message) + + // Set to [{}] for arrays, {} otherwise + if (Array.isArray(value)) { + event[key] = value.map(() => ({})) + } else { + event[key] = {} + } + } + } + + delete event.meta + return event +} + +// Main execution block +;(async () => { + // Parse command line arguments + const argv = minimist(process.argv.slice(2)) + const fromDate = new Date(`${argv.from}T00:00:00Z`) + const toDate = new Date(`${argv.to}T23:59:59Z`) // End of the day + const dbUrl = process.env.DEV_DATABASE_URL + + // Validate input + if (!fromDate || !toDate) { + console.error('Usage: node script.js --from= --to=') + process.exit(1) + } + + if (isNaN(fromDate) || isNaN(toDate)) { + console.error('Invalid date provided. Example :- "2025-07-01"') + process.exit(1) + } + + // Setup DB connection + const dbConfig = parseDbUrl(dbUrl) + const client = new Client(dbConfig) + + try { + await client.connect() + + // Query user IDs updated within the specified range + const userQuery = ` + SELECT id FROM users + WHERE updated_at BETWEEN $1 AND $2 + ` + const res = await client.query(userQuery, [fromDate.toISOString(), toDate.toISOString()]) + const userIds = res.rows.map((r) => r.id) + + console.log(`Found ${userIds.length} users\n`) + + let successCount = 0 + let errorCount = 0 + + // Process each user + for (const userId of userIds) { + try { + const userData = await fetchUserOrgRoles(client, userId) + let kafkaEvent = buildKafkaEvent(userData) + + if (!kafkaEvent) { + console.warn(`No org-role data for user ${userId}, skipping`) + continue + } + + kafkaEvent = await enrichLocationFields(kafkaEvent) + + // Send event to Kafka + eventBroadcasterKafka('userEvents', { + requestBody: kafkaEvent, + }) + + console.log(`Pushed user ${userId}`) + successCount++ + } catch (err) { + console.error(`Failed to push user ${userId}:`, err.message) + errorCount++ + } + } + + // Summary + console.log(`\nDone. Total: ${userIds.length}, Success: ${successCount}, Failed: ${errorCount}`) + } catch (err) { + console.error('Unexpected Error:', err) + } finally { + await client.end() + process.exit(0) + } +})() diff --git a/src/package.json b/src/package.json index 4aa21d6eb..9d548160f 100644 --- a/src/package.json +++ b/src/package.json @@ -61,6 +61,7 @@ "kafkajs": "^2.2.0", "lodash": "^4.17.21", "md5": "^2.3.0", + "minimist": "^1.2.8", "module-alias": "^2.2.3", "moment": "^2.29.1", "moment-timezone": "^0.5.34", From bf528acf5d42ad63e0e57a8190f46743c7a2091c Mon Sep 17 00:00:00 2001 From: prajwal Date: Thu, 7 Aug 2025 17:35:40 +0530 Subject: [PATCH 2/5] coderabbit comments addressed --- src/migrations/pushDataToKafka/README.md | 4 ++-- .../pushDataToKafka/pushUserDataToKafka.js | 24 +++++++++++++++---- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/migrations/pushDataToKafka/README.md b/src/migrations/pushDataToKafka/README.md index e49d4c6a2..b6bdb3201 100644 --- a/src/migrations/pushDataToKafka/README.md +++ b/src/migrations/pushDataToKafka/README.md @@ -29,7 +29,7 @@ This Node.js script pushes user data (along with organization, roles, and enrich The script expects a `.env` file at the **project root (`../../.env`)** with the following variables: ```env -DEV_DATABASE_URL=postgres://user:password@localhost:5432/database +DEV_DATABASE_URL=postgres://user:password@localhost:5432/database # Adjust for your environment ENTITY_MANAGEMENT_SERVICE_BASE_URL=http://localhost:5000 INTERNAL_ACCESS_TOKEN=your_internal_token ``` @@ -73,7 +73,7 @@ node pushUserDataToKafka.js --from="2025-07-01" --to="2025-07-30" ## πŸ“Œ Notes -- The script assumes Kafka is configured via the file at `../../configs/kafka/index.js`. +- The script assumes Kafka is configured via the file at `../../configs/kafka.js`. - Events are published using `eventBroadcasterKafka` helper. - If an Axios request fails (e.g., service down), it logs an error and continues with the next user. - Deleted users are detected via `deleted_at IS NOT NULL` and tagged as `"eventType": "delete"`. diff --git a/src/migrations/pushDataToKafka/pushUserDataToKafka.js b/src/migrations/pushDataToKafka/pushUserDataToKafka.js index a5b67ebdd..c74c7e63b 100644 --- a/src/migrations/pushDataToKafka/pushUserDataToKafka.js +++ b/src/migrations/pushDataToKafka/pushUserDataToKafka.js @@ -24,7 +24,7 @@ function parseDbUrl(url) { host: dbUrl.hostname, port: dbUrl.port, database: dbUrl.pathname.slice(1), - ssl: dbUrl.searchParams.get('sslmode') === 'require', + ssl: dbUrl.searchParams.get('sslmode') === 'require' ? { rejectUnauthorized: false } : false, } } @@ -172,6 +172,7 @@ async function fetchEntityDetail(id, tenantCode) { headers: { 'content-type': 'application/json', 'internal-access-token': process.env.INTERNAL_ACCESS_TOKEN, + timeout: 5000, // 5 seconds timeout }, } const response = await axios.post( @@ -189,6 +190,7 @@ async function fetchEntityDetail(id, tenantCode) { } } catch (error) { console.error('Axios Error:', error.response?.data?.message || error.message) + throw error // Let the caller handle the error appropriately } } @@ -255,6 +257,15 @@ async function enrichLocationFields(event) { process.exit(1) } + if (!dbUrl) { + console.error('DEV_DATABASE_URL environment variable is required') + process.exit(1) + } + if (!process.env.ENTITY_MANAGEMENT_SERVICE_BASE_URL || !process.env.INTERNAL_ACCESS_TOKEN) { + console.error('ENTITY_MANAGEMENT_SERVICE_BASE_URL and INTERNAL_ACCESS_TOKEN are required') + process.exit(1) + } + // Setup DB connection const dbConfig = parseDbUrl(dbUrl) const client = new Client(dbConfig) @@ -289,9 +300,14 @@ async function enrichLocationFields(event) { kafkaEvent = await enrichLocationFields(kafkaEvent) // Send event to Kafka - eventBroadcasterKafka('userEvents', { - requestBody: kafkaEvent, - }) + try { + await eventBroadcasterKafka('userEvents', { + requestBody: kafkaEvent, + }) + } catch (kafkaError) { + console.error(`Failed to publish to Kafka for user ${userId}:`, kafkaError.message) + throw kafkaError + } console.log(`Pushed user ${userId}`) successCount++ From 864a5d26e28f7b6b780e4a15223efc65e8cb6173 Mon Sep 17 00:00:00 2001 From: prajwal Date: Wed, 20 Aug 2025 11:33:35 +0530 Subject: [PATCH 3/5] added tenant filter and chunk method in the script --- .../pushDataToKafka/pushUserDataToKafka.js | 112 ++++++++++++------ 1 file changed, 73 insertions(+), 39 deletions(-) diff --git a/src/migrations/pushDataToKafka/pushUserDataToKafka.js b/src/migrations/pushDataToKafka/pushUserDataToKafka.js index c74c7e63b..afacc0e9d 100644 --- a/src/migrations/pushDataToKafka/pushUserDataToKafka.js +++ b/src/migrations/pushDataToKafka/pushUserDataToKafka.js @@ -7,10 +7,46 @@ require('../../configs/kafka')() // External dependencies const minimist = require('minimist') const axios = require('axios') +const { Kafka } = require('kafkajs') // Internal helper for Kafka broadcasting const { eventBroadcasterKafka } = require('../../helpers/eventBroadcasterMain') const { Client } = require('pg') +const BATCH_SIZE = 50 + +// Kafka healthCheck +async function initKafka() { + let producer + if (!producer) { + const kafka = new Kafka({ + clientId: 'mentoring', + brokers: process.env.KAFKA_URL.split(','), + }) + + producer = kafka.producer() + + try { + // Add a timeout so it doesn’t retry forever + await Promise.race([ + producer.connect(), + new Promise((_, reject) => setTimeout(() => reject(new Error('Kafka connect timeout')), 3000)), + ]) + console.log('βœ… Kafka is reachable') + } catch (err) { + console.error('❌ Kafka is not reachable:', err.message) + process.exit(1) + } + } + return producer +} + +function chunkArray(array, size) { + const result = [] + for (let i = 0; i < array.length; i += size) { + result.push(array.slice(i, i + size)) + } + return result +} /** * Parses a PostgreSQL connection URL into a config object @@ -31,7 +67,7 @@ function parseDbUrl(url) { /** * Fetches user details, along with associated organization and role information */ -async function fetchUserOrgRoles(client, userId) { +async function fetchUserOrgRoles(client, userId, tenantId) { const query = ` SELECT u.id AS user_id, @@ -77,9 +113,9 @@ async function fetchUserOrgRoles(client, userId) { ON u.id = uor.user_id AND uor.organization_code = org.code AND uor.tenant_code = u.tenant_code JOIN user_roles r ON r.id = uor.role_id AND r.tenant_code = u.tenant_code - WHERE u.id = $1 + WHERE u.id = $1 AND u.tenant_code = $2 ` - const result = await client.query(query, [userId]) + const result = await client.query(query, [userId, tenantId]) return result.rows } @@ -240,15 +276,19 @@ async function enrichLocationFields(event) { // Main execution block ;(async () => { + // Kafka healthCheck call + await initKafka() + // Parse command line arguments const argv = minimist(process.argv.slice(2)) const fromDate = new Date(`${argv.from}T00:00:00Z`) const toDate = new Date(`${argv.to}T23:59:59Z`) // End of the day + const tenantId = argv.tenantId const dbUrl = process.env.DEV_DATABASE_URL // Validate input - if (!fromDate || !toDate) { - console.error('Usage: node script.js --from= --to=') + if (!fromDate || !toDate || !tenantId) { + console.error('Usage: node script.js --from= --to= --tenantId=') process.exit(1) } @@ -273,52 +313,46 @@ async function enrichLocationFields(event) { try { await client.connect() - // Query user IDs updated within the specified range + // Query user IDs updated within the specified range and tenant const userQuery = ` SELECT id FROM users WHERE updated_at BETWEEN $1 AND $2 + AND tenant_code = $3 ` - const res = await client.query(userQuery, [fromDate.toISOString(), toDate.toISOString()]) + const res = await client.query(userQuery, [fromDate.toISOString(), toDate.toISOString(), tenantId]) const userIds = res.rows.map((r) => r.id) - console.log(`Found ${userIds.length} users\n`) + console.log(`Found ${userIds.length} users in tenant ${tenantId}\n`) - let successCount = 0 - let errorCount = 0 + const chunks = chunkArray(userIds, BATCH_SIZE) // Process each user - for (const userId of userIds) { - try { - const userData = await fetchUserOrgRoles(client, userId) - let kafkaEvent = buildKafkaEvent(userData) - - if (!kafkaEvent) { - console.warn(`No org-role data for user ${userId}, skipping`) - continue - } - - kafkaEvent = await enrichLocationFields(kafkaEvent) - - // Send event to Kafka - try { - await eventBroadcasterKafka('userEvents', { - requestBody: kafkaEvent, - }) - } catch (kafkaError) { - console.error(`Failed to publish to Kafka for user ${userId}:`, kafkaError.message) - throw kafkaError - } - - console.log(`Pushed user ${userId}`) - successCount++ - } catch (err) { - console.error(`Failed to push user ${userId}:`, err.message) - errorCount++ - } + for (const chunk of chunks) { + await Promise.all( + chunk.map(async (userId) => { + try { + const userData = await fetchUserOrgRoles(client, userId, tenantId) + let kafkaEvent = buildKafkaEvent(userData) + + if (!kafkaEvent) { + console.warn(`No org-role data for user ${userId}, skipping`) + return + } + + kafkaEvent = await enrichLocationFields(kafkaEvent) + + await eventBroadcasterKafka('userEvents', { requestBody: kafkaEvent }) + + console.log(`Pushed user ${userId}`) + } catch (err) { + console.error(`Failed to push user ${userId}:`, err.message) + } + }) + ) } // Summary - console.log(`\nDone. Total: ${userIds.length}, Success: ${successCount}, Failed: ${errorCount}`) + console.log(`\nDone. Total: ${userIds.length}`) } catch (err) { console.error('Unexpected Error:', err) } finally { From 69d95eae25a6602e058a7385211345c039021f41 Mon Sep 17 00:00:00 2001 From: prajwal Date: Wed, 20 Aug 2025 12:49:11 +0530 Subject: [PATCH 4/5] updated md file for kafka-migration-script --- src/migrations/pushDataToKafka/README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/migrations/pushDataToKafka/README.md b/src/migrations/pushDataToKafka/README.md index b6bdb3201..b77e02b50 100644 --- a/src/migrations/pushDataToKafka/README.md +++ b/src/migrations/pushDataToKafka/README.md @@ -41,16 +41,17 @@ INTERNAL_ACCESS_TOKEN=your_internal_token Run the script using: ```bash -node pushUserDataToKafka.js --from="YYYY-MM-DD" --to="YYYY-MM-DD" +node pushUserDataToKafka.js --from="YYYY-MM-DD" --to="YYYY-MM-DD" --tenantId="TENANT-ID" ``` - `--from`: Start date (inclusive) - `--to`: End date (inclusive) +- `--tenantId`: Tenant Id πŸ“Œ **Example:** ```bash -node pushUserDataToKafka.js --from="2025-07-01" --to="2025-07-30" +node pushUserDataToKafka.js --from="2025-07-01" --to="2025-07-30" --tenantId="shikshalokam" ``` --- From fc33f34a2ab2d28db25d603a14823e9f0933231c Mon Sep 17 00:00:00 2001 From: prajwal Date: Tue, 9 Sep 2025 08:30:50 +0530 Subject: [PATCH 5/5] comments addressed --- .../pushDataToKafka/pushUserDataToKafka.js | 152 +++++++++++------- src/package.json | 2 +- 2 files changed, 97 insertions(+), 57 deletions(-) diff --git a/src/migrations/pushDataToKafka/pushUserDataToKafka.js b/src/migrations/pushDataToKafka/pushUserDataToKafka.js index afacc0e9d..92c4231c2 100644 --- a/src/migrations/pushDataToKafka/pushUserDataToKafka.js +++ b/src/migrations/pushDataToKafka/pushUserDataToKafka.js @@ -8,11 +8,18 @@ require('../../configs/kafka')() const minimist = require('minimist') const axios = require('axios') const { Kafka } = require('kafkajs') +const pLimit = require('p-limit') // concurrency limiter // Internal helper for Kafka broadcasting const { eventBroadcasterKafka } = require('../../helpers/eventBroadcasterMain') const { Client } = require('pg') -const BATCH_SIZE = 50 +const BATCH_SIZE = 200 + +// Simple in-memory cache for entity details +const entityCache = new Map() + +// Limit concurrency of API calls to avoid overwhelming service +const limit = pLimit(10) // allow max 10 concurrent API requests // Kafka healthCheck async function initKafka() { @@ -66,8 +73,9 @@ function parseDbUrl(url) { /** * Fetches user details, along with associated organization and role information + * for multiple users in a single query (optimized with WHERE id = ANY($1)) */ -async function fetchUserOrgRoles(client, userId, tenantId) { +async function fetchUserOrgRoles(client, userIds, tenantId) { const query = ` SELECT u.id AS user_id, @@ -113,12 +121,23 @@ async function fetchUserOrgRoles(client, userId, tenantId) { ON u.id = uor.user_id AND uor.organization_code = org.code AND uor.tenant_code = u.tenant_code JOIN user_roles r ON r.id = uor.role_id AND r.tenant_code = u.tenant_code - WHERE u.id = $1 AND u.tenant_code = $2 + WHERE u.id = ANY($1) AND u.tenant_code = $2 ` - const result = await client.query(query, [userId, tenantId]) + const result = await client.query(query, [userIds, tenantId]) return result.rows } +/** + * Groups rows by user_id for processing (using reduce) + */ +function groupByUserId(rows) { + return rows.reduce((acc, row) => { + if (!acc[row.user_id]) acc[row.user_id] = [] + acc[row.user_id].push(row) + return acc + }, {}) +} + /** * Builds a Kafka-compatible event object from user-org-role data */ @@ -191,43 +210,56 @@ function buildKafkaEvent(userRows) { /** * Fetches entity metadata from external service using ID and tenantCode + * with caching + concurrency limiting */ async function fetchEntityDetail(id, tenantCode) { - try { - if (id && Array.isArray(id) && id.length > 0) { - id = { - $in: id, + // Check cache first + const cacheKey = `${tenantCode}:${JSON.stringify(id)}` + if (entityCache.has(cacheKey)) { + return entityCache.get(cacheKey) + } + + return limit(async () => { + try { + if (id && Array.isArray(id) && id.length > 0) { + id = { + $in: id, + } } + const projection = ['_id', 'metaInformation'] + let requestJSON = { + query: { _id: id, tenantId: tenantCode }, + projection: projection, + } + const options = { + headers: { + 'content-type': 'application/json', + 'internal-access-token': process.env.INTERNAL_ACCESS_TOKEN, + timeout: 5000, // 5 seconds timeout + }, + } + const response = await axios.post( + `${process.env.ENTITY_MANAGEMENT_SERVICE_BASE_URL}/v1/entities/find`, + requestJSON, + options + ) + const entity = response.data?.result?.[0] + if (!entity) throw new Error(`Entity not found for id: ${id}`) + + const entityObj = { + id: entity._id, + name: entity.metaInformation.name, + externalId: entity.metaInformation.externalId, + } + + // Save to cache + entityCache.set(cacheKey, entityObj) + return entityObj + } catch (error) { + console.error('Axios Error:', error.response?.data?.message || error.message) + throw error // Let the caller handle the error appropriately } - const projection = ['_id', 'metaInformation'] - let requestJSON = { - query: { _id: id, tenantId: tenantCode }, - projection: projection, - } - const options = { - headers: { - 'content-type': 'application/json', - 'internal-access-token': process.env.INTERNAL_ACCESS_TOKEN, - timeout: 5000, // 5 seconds timeout - }, - } - const response = await axios.post( - `${process.env.ENTITY_MANAGEMENT_SERVICE_BASE_URL}/v1/entities/find`, - requestJSON, - options - ) - const entity = response.data?.result?.[0] - if (!entity) throw new Error(`Entity not found for id: ${id}`) - - return { - id: entity._id, - name: entity.metaInformation.name, - externalId: entity.metaInformation.externalId, - } - } catch (error) { - console.error('Axios Error:', error.response?.data?.message || error.message) - throw error // Let the caller handle the error appropriately - } + }) } /** @@ -326,29 +358,37 @@ async function enrichLocationFields(event) { const chunks = chunkArray(userIds, BATCH_SIZE) - // Process each user + // Process each batch of users for (const chunk of chunks) { - await Promise.all( - chunk.map(async (userId) => { - try { - const userData = await fetchUserOrgRoles(client, userId, tenantId) - let kafkaEvent = buildKafkaEvent(userData) - - if (!kafkaEvent) { - console.warn(`No org-role data for user ${userId}, skipping`) - return - } + try { + // Fetch org-role details for all users in this batch with one query + const rows = await fetchUserOrgRoles(client, chunk, tenantId) + const grouped = groupByUserId(rows) - kafkaEvent = await enrichLocationFields(kafkaEvent) + // Process each user individually for Kafka event building + push + await Promise.all( + Object.entries(grouped).map(async ([userId, userData]) => { + try { + let kafkaEvent = buildKafkaEvent(userData) - await eventBroadcasterKafka('userEvents', { requestBody: kafkaEvent }) + if (!kafkaEvent) { + console.warn(`No org-role data for user ${userId}, skipping`) + return + } - console.log(`Pushed user ${userId}`) - } catch (err) { - console.error(`Failed to push user ${userId}:`, err.message) - } - }) - ) + kafkaEvent = await enrichLocationFields(kafkaEvent) + + await eventBroadcasterKafka('userEvents', { requestBody: kafkaEvent }) + + console.log(`Pushed user ${userId}`) + } catch (err) { + console.error(`Failed to push user ${userId}:`, err.message) + } + }) + ) + } catch (err) { + console.error('Batch failed:', err.message) + } } // Summary diff --git a/src/package.json b/src/package.json index 9d548160f..220093659 100644 --- a/src/package.json +++ b/src/package.json @@ -67,7 +67,7 @@ "moment-timezone": "^0.5.34", "nanoid": "^5.1.5", "p-each-series": "^2.1.0", - "p-limit": "^6.2.0", + "p-limit": "^2.3.0", "path-to-regexp": "^6.2.1", "pg": "^8.11.0", "pg-hstore": "^2.3.4",