[Do not accept] Added new dummy kafka topic after user update#871
[Do not accept] Added new dummy kafka topic after user update#871komalm wants to merge 1 commit intoELEVATE-Project:developfrom
Conversation
WalkthroughThe pull request extends the event-broadcasting system to emit user submission events to Kafka. It introduces three new environment variables for Kafka topic configuration and feature toggling, adds a new Kafka publisher function, routes submission events through the event broadcaster, and emits submission events during user updates. Changes
Sequence DiagramsequenceDiagram
actor User
participant UserService as User Service
participant EventBroadcaster as Event Broadcaster
participant KafkaComm as Kafka Communication
participant Kafka as Kafka Topic
User->>UserService: Update user
UserService->>UserService: Process update
UserService->>EventBroadcaster: Emit submissionEvents
EventBroadcaster->>EventBroadcaster: Check EVENT_OBSERVATION_UPDATE_KAFKA_EVENTS
EventBroadcaster->>KafkaComm: pushUserSubmissionToKafka(requestBody)
KafkaComm->>KafkaComm: Build payload with topic from ENV
KafkaComm->>Kafka: Send message
Kafka-->>KafkaComm: Response (topic, partition, offset)
KafkaComm-->>EventBroadcaster: Return response
EventBroadcaster-->>UserService: Event handled
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@src/envVariables.js`:
- Around line 455-469: Remove the unused SUBMISSION_KAFKA_TOPIC entry and
consolidate to a single topic variable used by the publisher
(EVENT_OBSERVATION_UPDATE_KAFKA_TOPIC), change its default to a generic
placeholder (e.g., 'submission.topic' or empty string) instead of the
customer-specific 'brac.observation.submission.dev', and add a requiredIf
condition tied to EVENT_ENABLE_SUBMISSION_KAFKA_EVENTS so the env validator
forces the topic to be set when submission events are enabled; also verify the
publisher in kafka-communication.js reads EVENT_OBSERVATION_UPDATE_KAFKA_TOPIC
(not SUBMISSION_KAFKA_TOPIC).
In `@src/generics/kafka-communication.js`:
- Around line 83-126: The pushUserSubmissionToKafka function currently logs PII
by printing the first 200 chars of the serialized message and uses the wrong env
var name; fix it by removing/redacting payload content logging and only log
non-PII metadata (payload size, message count, topic, producer status) and by
replacing process.env.EVENT_ENABLE_OBSERVATION_UPDATE_KAFKA_EVENTS with the
correct variable process.env.EVENT_ENABLE_SUBMISSION_KAFKA_EVENTS (update all
occurrences in pushUserSubmissionToKafka) so logs do not expose user data and
reflect the real feature flag.
In `@src/helpers/eventBroadcasterMain.js`:
- Around line 41-42: The branch handling 'submissionEvents-kafka' in
isEventEnabled is checking the wrong environment variable name
(EVENT_ENABLE_OBSERVATION_UPDATE_KAFKA_EVENTS) so the flag is effectively always
enabled; change that reference to the canonical name
EVENT_ENABLE_SUBMISSION_KAFKA_EVENTS used in src/envVariables.js (or, if you
intend the other name to be canonical, rename the env var in envVariables.js
instead) so the condition correctly reads
process.env.EVENT_ENABLE_SUBMISSION_KAFKA_EVENTS !== 'false' in the
isEventEnabled switch case for 'submissionEvents-kafka'.
In `@src/services/user.js`:
- Around line 205-246: Replace the hardcoded test data in the
newRequestBody.userProfile with real fields pulled from the update
result/current record (use processDbResponse and currentUser.dataValues where
appropriate), set newRequestBody.oldValues to the pre-update state (use the
existing user or oldValues variable computed earlier) and
newRequestBody.newValues to processDbResponse, and remove the large
commented-out debug block; update the code around newRequestBody,
processDbResponse, currentUser, user, oldValues and the broadcastEvent call so
the Kafka event emits accurate old vs new values and no hardcoded PII.
🧹 Nitpick comments (2)
src/helpers/eventBroadcasterMain.js (1)
92-114: Dead code inbroadcastEvent—broadcastervariable is assigned but never used.Inside both branches of the
forEach(Lines 106–108), aconst broadcasteris assigned but never referenced. This appears to be leftover scaffolding. Either log or remove the variable.src/generics/kafka-communication.js (1)
83-126: Excessive verbose logging — consider using structured logging with appropriate levels.This function has ~20
console.logcalls that will fire on every submission event. WhilepushUserEventsToKafkafollows the same pattern, this level of verbosity is more suited for temporary debugging than production code. Consider using a logger with configurable log levels (e.g.,debuglevel for the detailed output,infofor the summary).
| SUBMISSION_KAFKA_TOPIC: { | ||
| message: 'Kafka topic for Submission Event', | ||
| optional: true, | ||
| default: 'brac.observation.submission.dev', | ||
| }, | ||
| EVENT_OBSERVATION_UPDATE_KAFKA_TOPIC: { | ||
| message: 'Kafka topic for Observation Update Event', | ||
| optional: true, | ||
| default: 'brac.observation.submission.dev', | ||
| }, | ||
| EVENT_ENABLE_SUBMISSION_KAFKA_EVENTS: { | ||
| message: 'Key to toggle submission kafka event', | ||
| optional: true, | ||
| default: 'true', | ||
| }, |
There was a problem hiding this comment.
Naming confusion and unused variable — clean up topic configuration.
Several issues with these three new variables:
SUBMISSION_KAFKA_TOPIC(Line 455) is never referenced insrc/generics/kafka-communication.js— the Kafka publisher actually readsEVENT_OBSERVATION_UPDATE_KAFKA_TOPIC(Line 460). This dead variable will confuse operators.- Default value
'brac.observation.submission.dev'embeds a customer-specific prefix and a.devenvironment suffix. Other topics in this file use generic defaults (e.g.,'mentoring.topic','notificationtopic'). This will accidentally publish to a dev topic if the env var isn't overridden in production. - Missing
requiredIf— existing topic variables likeEVENT_USER_KAFKA_TOPIC(Line 441) userequiredIfto enforce the topic is set when events are enabled. The new topic variables lack this, so enabling submission events without setting a topic will silently publish to the hardcoded dev default.
🤖 Prompt for AI Agents
In `@src/envVariables.js` around lines 455 - 469, Remove the unused
SUBMISSION_KAFKA_TOPIC entry and consolidate to a single topic variable used by
the publisher (EVENT_OBSERVATION_UPDATE_KAFKA_TOPIC), change its default to a
generic placeholder (e.g., 'submission.topic' or empty string) instead of the
customer-specific 'brac.observation.submission.dev', and add a requiredIf
condition tied to EVENT_ENABLE_SUBMISSION_KAFKA_EVENTS so the env validator
forces the topic to be set when submission events are enabled; also verify the
publisher in kafka-communication.js reads EVENT_OBSERVATION_UPDATE_KAFKA_TOPIC
(not SUBMISSION_KAFKA_TOPIC).
| const pushUserSubmissionToKafka = async (message) => { | ||
| try { | ||
| console.log('📤 [OBSERVATION UPDATE KAFKA] ===== SENDING OBSERVATION UPDATE EVENT =====') | ||
| console.log('📤 [OBSERVATION UPDATE KAFKA] Target Topic:', process.env.EVENT_OBSERVATION_UPDATE_KAFKA_TOPIC) | ||
| console.log( | ||
| '📤 [OBSERVATION UPDATE KAFKA] Kafka Producer Status:', | ||
| kafkaProducer ? 'Connected' : 'Not Connected' | ||
| ) | ||
| console.log( | ||
| '📤 [OBSERVATION UPDATE KAFKA] Event Enabled:', | ||
| process.env.EVENT_ENABLE_OBSERVATION_UPDATE_KAFKA_EVENTS | ||
| ) | ||
|
|
||
| const messageString = JSON.stringify(message) | ||
| console.log('📤 [OBSERVATION UPDATE KAFKA] Payload Size:', messageString.length, 'bytes') | ||
| console.log('📤 [OBSERVATION UPDATE KAFKA] First 200 chars:', messageString.substring(0, 200) + '...') | ||
|
|
||
| const payload = { | ||
| topic: process.env.EVENT_OBSERVATION_UPDATE_KAFKA_TOPIC, | ||
| messages: [{ value: messageString }], | ||
| } | ||
| console.log('📤 [OBSERVATION UPDATE KAFKA] Kafka Payload Prepared:') | ||
| console.log('📤 [OBSERVATION UPDATE KAFKA] Topic:', payload.topic) | ||
| console.log('📤 [OBSERVATION UPDATE KAFKA] Messages Count:', payload.messages.length) | ||
|
|
||
| console.log('📤 [OBSERVATION UPDATE KAFKA] Sending to Kafka...') | ||
| const response = await pushPayloadToKafka(payload) | ||
|
|
||
| console.log('📤 [OBSERVATION UPDATE KAFKA] ✅ Kafka Response Received:') | ||
| console.log('📤 [OBSERVATION UPDATE KAFKA] Success:', response ? 'Yes' : 'No') | ||
| if (response && response[0]) { | ||
| console.log('📤 [OBSERVATION UPDATE KAFKA] Topic:', response[0].topicName) | ||
| console.log('📤 [OBSERVATION UPDATE KAFKA] Partition:', response[0].partition) | ||
| console.log('📤 [OBSERVATION UPDATE KAFKA] Offset:', response[0].baseOffset) | ||
| } | ||
| console.log('📤 [OBSERVATION UPDATE KAFKA] ===== OBSERVATION UPDATE EVENT COMPLETED =====') | ||
|
|
||
| return response | ||
| } catch (error) { | ||
| console.log('📤 [OBSERVATION UPDATE KAFKA] ❌ ERROR in pushUserSubmissionToKafka:', error.message) | ||
| console.log('📤 [OBSERVATION UPDATE KAFKA] ❌ Error stack:', error.stack) | ||
| return error | ||
| } | ||
| } |
There was a problem hiding this comment.
PII leakage in logs and env-var name mismatch.
Two concerns with this new function:
- Line 98 logs the first 200 characters of the serialized message payload. Given that the payload (constructed in
src/services/user.js) contains user profile data (name, DOB, phone code, etc.), this will write PII to application logs on every invocation — a compliance/privacy risk. - Line 93 logs
process.env.EVENT_ENABLE_OBSERVATION_UPDATE_KAFKA_EVENTS, butsrc/envVariables.jsdefinesEVENT_ENABLE_SUBMISSION_KAFKA_EVENTS. This will always logundefined, which is misleading during debugging.
Proposed fix for the env-var reference and PII logging
- console.log(
- '📤 [OBSERVATION UPDATE KAFKA] Event Enabled:',
- process.env.EVENT_ENABLE_OBSERVATION_UPDATE_KAFKA_EVENTS
- )
+ console.log(
+ '📤 [OBSERVATION UPDATE KAFKA] Event Enabled:',
+ process.env.EVENT_ENABLE_SUBMISSION_KAFKA_EVENTS
+ )- console.log('📤 [OBSERVATION UPDATE KAFKA] First 200 chars:', messageString.substring(0, 200) + '...')
+ console.log('📤 [OBSERVATION UPDATE KAFKA] Payload Size:', messageString.length, 'bytes')🤖 Prompt for AI Agents
In `@src/generics/kafka-communication.js` around lines 83 - 126, The
pushUserSubmissionToKafka function currently logs PII by printing the first 200
chars of the serialized message and uses the wrong env var name; fix it by
removing/redacting payload content logging and only log non-PII metadata
(payload size, message count, topic, producer status) and by replacing
process.env.EVENT_ENABLE_OBSERVATION_UPDATE_KAFKA_EVENTS with the correct
variable process.env.EVENT_ENABLE_SUBMISSION_KAFKA_EVENTS (update all
occurrences in pushUserSubmissionToKafka) so logs do not expose user data and
reflect the real feature flag.
| case 'submissionEvents-kafka': | ||
| return process.env.EVENT_ENABLE_OBSERVATION_UPDATE_KAFKA_EVENTS !== 'false' |
There was a problem hiding this comment.
Environment variable name mismatch — feature flag will never work.
isEventEnabled checks process.env.EVENT_ENABLE_OBSERVATION_UPDATE_KAFKA_EVENTS, but src/envVariables.js (line 465) defines EVENT_ENABLE_SUBMISSION_KAFKA_EVENTS. Since the referenced variable is never set, it will always be undefined, and undefined !== 'false' is true, making the toggle permanently enabled.
Pick one consistent name across both files.
Proposed fix (assuming envVariables.js is the source of truth)
case 'submissionEvents-kafka':
- return process.env.EVENT_ENABLE_OBSERVATION_UPDATE_KAFKA_EVENTS !== 'false'
+ return process.env.EVENT_ENABLE_SUBMISSION_KAFKA_EVENTS !== 'false'📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case 'submissionEvents-kafka': | |
| return process.env.EVENT_ENABLE_OBSERVATION_UPDATE_KAFKA_EVENTS !== 'false' | |
| case 'submissionEvents-kafka': | |
| return process.env.EVENT_ENABLE_SUBMISSION_KAFKA_EVENTS !== 'false' |
🤖 Prompt for AI Agents
In `@src/helpers/eventBroadcasterMain.js` around lines 41 - 42, The branch
handling 'submissionEvents-kafka' in isEventEnabled is checking the wrong
environment variable name (EVENT_ENABLE_OBSERVATION_UPDATE_KAFKA_EVENTS) so the
flag is effectively always enabled; change that reference to the canonical name
EVENT_ENABLE_SUBMISSION_KAFKA_EVENTS used in src/envVariables.js (or, if you
intend the other name to be canonical, rename the env var in envVariables.js
instead) so the condition correctly reads
process.env.EVENT_ENABLE_SUBMISSION_KAFKA_EVENTS !== 'false' in the
isEventEnabled switch case for 'submissionEvents-kafka'.
| const newRequestBody = { | ||
| entity: 'user', | ||
| eventType: 'update', | ||
| entityId: processDbResponse?.id, | ||
| oldValues: processDbResponse, | ||
| newValues: processDbResponse, | ||
| userProfile: { | ||
| // newValues | ||
| id: '3087', | ||
| email_verified: 'false', | ||
| name: 'one farabi updated via datapipline', | ||
| // username: "farabi", | ||
| phone_code: '91', | ||
| about: 'One Kafka update the value via datapipline', | ||
| dob: '22-12-2000', | ||
| meta: null, | ||
| // share_link: null, | ||
| // status: "ACTIVE", | ||
| // email_verified: "false", | ||
| // name: currentName, | ||
| // // username: "farabi", | ||
| // phone_code: "91", | ||
| // about: "my self farabi Three", | ||
| // dob: "22-12-2000", | ||
| // share_link: null, | ||
| // status: "ACTIVE", | ||
| // image: null, | ||
| // has_accepted_terms_and_conditions: false, | ||
| // languages: null, | ||
| // preferred_language: "en", | ||
| // tenant_code: "brac", | ||
| // meta: { | ||
| // about: 'Kafka update the value via datapipline updated', | ||
| // dob: '22-12-2000 updated', | ||
| // }, | ||
| created_at: currentUser.dataValues?.created_at || null, | ||
| updated_at: currentUser.dataValues?.updated_at || new Date(), | ||
| // deleted_at: null | ||
| }, | ||
| } | ||
|
|
||
| broadcastEvent('submissionEvents', { requestBody: newRequestBody, isInternal: true }) |
There was a problem hiding this comment.
Hardcoded test/dummy data in production code path — must not be merged.
The userProfile object (Lines 213–221) contains hardcoded PII and test values (id: '3087', name: 'one farabi updated via datapipline', dob: '22-12-2000', etc.) instead of deriving them from processDbResponse or currentUser. This will emit incorrect, static data to Kafka for every user update.
Additionally:
oldValuesandnewValues(Lines 209–210) are both set toprocessDbResponse, making them identical. This defeats the purpose of tracking what changed —oldValuesshould use the pre-update user state (theuservariable from Line 48 oroldValuesalready computed at Line 164).- Lines 222–239 contain a large block of commented-out debug code that should be removed.
As per coding guidelines, src/services/** is core business logic — correctness is essential here.
🤖 Prompt for AI Agents
In `@src/services/user.js` around lines 205 - 246, Replace the hardcoded test data
in the newRequestBody.userProfile with real fields pulled from the update
result/current record (use processDbResponse and currentUser.dataValues where
appropriate), set newRequestBody.oldValues to the pre-update state (use the
existing user or oldValues variable computed earlier) and
newRequestBody.newValues to processDbResponse, and remove the large
commented-out debug block; update the code around newRequestBody,
processDbResponse, currentUser, user, oldValues and the broadcastEvent call so
the Kafka event emits accurate old vs new values and no hardcoded PII.
Summary by CodeRabbit
Release Notes