Conversation
WalkthroughAdds mandatory Kafka groupId validation when Kafka is enabled. Updates Kafka health-check signature to accept groupId and appends Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor App
participant HC as HealthCheck
participant Val as ConfigValidator
participant K as KafkaService
App->>HC: run()
HC->>Val: validate(config)
alt Kafka enabled
Val-->>HC: requires url, topic, groupId
opt Missing groupId
Val-->>HC: throw "Missing 'groupId' for enabled service: kafka"
HC-->>App: fail early
end
opt All present
HC->>K: check(kafkaUrl, topicName, groupId)
note right of K #DDEBF7: derive uniqueTopicName = topic + "-" + pid\nderive uniqueGroupId = groupId + "-" + pid
K->>K: ensureTopicExists(uniqueTopicName)
K->>K: produce message -> uniqueTopicName
K->>K: consume from uniqueTopicName using uniqueGroupId
K-->>HC: true / false
HC-->>App: result
end
else Kafka disabled
Val-->>HC: ok
HC-->>App: proceed with other checks
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
health-check/services/kafka.js (1)
86-94: kafka-node Consumer ignores groupId; use ConsumerGroup to honor group semantics.The options object for kafka.Consumer does not support groupId. If you need per-process isolation via consumer groups, you should use kafka.ConsumerGroup (or ConsumerGroupStream). As written, uniqueGroupId is unused by the underlying client.
Minimal change to use ConsumerGroup:
- const consumer = new kafka.Consumer( - client, - [{ topic: uniqueTopicName, partition: 0 }], - { - groupId: uniqueGroupId, - autoCommit: true, - fromOffset: false, - } - ); + const consumer = new kafka.ConsumerGroup( + { + kafkaHost: kafkaUrl, + groupId: uniqueGroupId, + autoCommit: true, + fromOffset: 'latest', + }, + [uniqueTopicName] + );Note: ConsumerGroup.close signature is close(cb) (no force boolean). See cleanup comments below.
health-check/index.js (1)
35-42: IncludegroupIdin Kafka health check invocationThe
kafka.checksignature now requires three arguments—url,topic, andgroupId—but the current call only passes two, causinggroupIdto beundefined.• File: health-check/index.js
• Line: 38Apply the following diff:
- const healthy = await kafka.check(config.checks.kafka.url, config.checks.kafka.topic) + const healthy = await kafka.check( + config.checks.kafka.url, + config.checks.kafka.topic, + config.checks.kafka.groupId + )
🧹 Nitpick comments (5)
health-check/services/kafka.js (4)
96-123: Cleanup and timeout handling should match ConsumerGroup API.If you switch to ConsumerGroup, adjust close calls accordingly:
- consumer.close(true, () => { + consumer.close(() => { client.close(); resolve(false); });And similarly in the success and error handlers:
- consumer.close(true, () => { + consumer.close(() => { client.close(); resolve(true); });- consumer.close(true, () => { + consumer.close(() => { client.close(); resolve(false); });
74-84: Produce-before-consume race can drop the test message. Start consumer first, then produce.With fromOffset: false/'latest', creating the consumer after producing risks missing the message. Start the consumer, wait for it to be ready, then send the message.
High-level approach:
- Instantiate the consumer (ConsumerGroup recommended).
- On consumer 'connect' (or after subscription), send via producer.
- Listen for the message and resolve.
I can provide a concrete refactor if you’d like to go this route.
52-53: Use a separator when suffixing process.pid for readability and to avoid ambiguous names.For example, topic-123 vs topic12 and 3. The earlier diff adds a hyphen; consider adopting that style across both uniqueTopicName and uniqueGroupId.
27-45: Creating per-process topics can clutter the cluster; prefer a shared topic with unique consumer group.Per-process unique topics require topic-creation permissions and can quickly bloat metadata. For a health check, a single well-known topic plus per-process unique groupId (or a unique message key) usually suffices.
If isolation is desired, use:
- topic: configured shared topic
- groupId:
${baseGroupId}-${process.pid}This reduces admin overhead while maintaining isolation.
health-check/index.js (1)
124-137: Docstring nit: include 'topic' and 'groupId' in Kafka config description.The JSDoc still says “Kafka config with
enabledandurl”, but Kafka now requires topic and groupId as well.- * @param {Object} [config.checks.kafka] - Kafka config with `enabled` and `url`. + * @param {Object} [config.checks.kafka] - Kafka config with `enabled`, `url`, `topic`, and `groupId`.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
health-check/index.js(1 hunks)health-check/package.json(1 hunks)health-check/services/kafka.js(3 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
health-check/index.js (1)
health-check/services/kafka.js (1)
kafka(8-8)
health-check/services/kafka.js (1)
health-check/index.js (1)
kafka(37-37)
🔇 Additional comments (2)
health-check/package.json (1)
3-3: Version bump looks good.No issues spotted with bumping to 0.0.5.
health-check/index.js (1)
166-168: Good addition: enforce groupId when Kafka health check is enabled.Validation is consistent with requiring url and topic.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
health-check/services/kafka.js (1)
74-93: Ensure the Kafka consumer is subscribed before producing to avoid missing messagesThe current code instantiates the consumer only after
producer.sendcompletes withfromOffset: false, which by default (when no committed offset exists) will start at the “latest” offset and skip the message you just sent. This can lead to intermittent timeouts in your health check.Please refactor the block in health-check/services/kafka.js (lines 74–93) to subscribe the consumer first, then produce. Two recommended approaches:
Use ConsumerGroup and subscribe up front:
const consumer = new kafka.ConsumerGroup( { kafkaHost: kafkaUrl, groupId: uniqueGroupId, fromOffset: 'latest', // ensures new messages only sessionTimeout: 10000, }, [uniqueTopicName] ); consumer.on('connect', () => { console.log('[Kafka Health Check] Consumer connected—now producing'); producer.send(payloads, (err, data) => { /* … */ }); });Or create a plain Consumer before sending (with
fromOffset: 'latest'), then callproducer.sendin itson('connect')handler.Both ensure the consumer is listening when the message arrives. This refactor is essential for reliable health checks.
🧹 Nitpick comments (4)
health-check/services/kafka.js (4)
49-49: Nit: add spaces in function signature; also drop redundantasyncon a function that returns its own Promise.Minor style fix and avoids the async+new Promise anti-pattern.
Apply:
-async function check(kafkaUrl,topicName,groupId) { +function check(kafkaUrl, topicName, groupId) {
51-53: Avoid cross-host collisions: include hostname (or a short uuid) in the suffix, not just pid.PIDs can collide across hosts. If multiple instances run on different machines, topic/group names may clash.
Apply:
- const pidSuffix = `-${process.pid}`; - const uniqueTopicName = `${topicName}${pidSuffix}`; - const uniqueGroupId = `${groupId}${pidSuffix}`; + const pidSuffix = `-${require('os').hostname()}-${process.pid}`; + const uniqueTopicName = `${topicName}${pidSuffix}`; + const uniqueGroupId = `${groupId}${pidSuffix}`;If you prefer not to import in-place, add at top-level:
const os = require('os');and then:
- const pidSuffix = `-${process.pid}`; + const pidSuffix = `-${os.hostname()}-${process.pid}`;
54-54: Don’t log full kafkaUrl; potential credential leak.If SASL credentials or query params are ever embedded, this prints secrets. Log only hosts or a redacted URL.
[security]Example:
- console.log(`[Kafka Health Check] Connecting to Kafka at ${kafkaUrl}`); + const firstBroker = String(kafkaUrl).split(',')[0]; + console.log(`[Kafka Health Check] Connecting to Kafka at ${firstBroker} (brokers list redacted)`);
58-58: Creating a per-process topic on every health check can cause topic sprawl.If the process restarts frequently (new pid each time), this will create many topics that persist. Consider a single shared health-check topic and only vary groupId/consumer identity, or add a config flag to skip auto-create in hardened clusters.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
health-check/services/kafka.js(3 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: MallanagoudaB
PR: ELEVATE-Project/utils#342
File: health-check/services/kafka.js:49-54
Timestamp: 2025-08-20T10:33:46.335Z
Learning: In the ELEVATE-Project/utils health-check module, validation for Kafka configuration including groupId is performed in the parent file (health-check/index.js) in the validateHealthConfig function, not in the individual service files like health-check/services/kafka.js.
📚 Learning: 2025-08-20T10:33:46.335Z
Learnt from: MallanagoudaB
PR: ELEVATE-Project/utils#342
File: health-check/services/kafka.js:49-54
Timestamp: 2025-08-20T10:33:46.335Z
Learning: In the ELEVATE-Project/utils health-check module, validation for Kafka configuration including groupId is performed in the parent file (health-check/index.js) in the validateHealthConfig function, not in the individual service files like health-check/services/kafka.js.
Applied to files:
health-check/services/kafka.js
📚 Learning: 2025-08-20T10:33:46.335Z
Learnt from: MallanagoudaB
PR: ELEVATE-Project/utils#342
File: health-check/services/kafka.js:49-54
Timestamp: 2025-08-20T10:33:46.335Z
Learning: In the ELEVATE-Project/utils health-check module, when updating function signatures to add new parameters, both the service file and the parent file (health-check/index.js) that calls the function need to be updated to pass all required parameters.
Applied to files:
health-check/services/kafka.js
🧬 Code Graph Analysis (1)
health-check/services/kafka.js (1)
health-check/index.js (1)
kafka(37-37)
🔇 Additional comments (1)
health-check/services/kafka.js (1)
67-68: LGTM: producing to the pid-suffixed topic is correct.Payload correctly targets the unique topic instance.
| [{ topic: uniqueTopicName, partition: 0 }], | ||
| { | ||
| autoCommit: true, | ||
| groupId: uniqueGroupId, | ||
| fromOffset: false, | ||
| } |
There was a problem hiding this comment.
💡 Verification agent
🧩 Analysis chain
Use ConsumerGroup for group semantics; groupId/fromOffset options are for ConsumerGroup, not plain Consumer.
With kafka.Consumer, the groupId is ignored for proper consumer group coordination, and fromOffset semantics differ. Switching to ConsumerGroup aligns with the intent of using a unique groupId.
Example replacement:
const consumer = new kafka.ConsumerGroup(
{
kafkaHost: kafkaUrl,
groupId: uniqueGroupId,
fromOffset: 'latest', // or 'earliest' if you need to catch up
},
[uniqueTopicName]
);To double-check the option support across versions, please verify against the kafka-node docs:
🌐 Web query:
In kafka-node, which constructor supports groupId and fromOffset options: Consumer or ConsumerGroup? Provide the official option lists for each.
💡 Result:
In kafka-node, the ConsumerGroup constructor supports both the groupId and fromOffset options, while the basic Consumer constructor does not support a groupId (it is designed for single-consumer scenarios and offset management per partition)[7][2].
Official Option Lists
ConsumerGroup options (from official kafka-node documentation and source[7]):
- kafkaHost: Kafka broker address
- batch: Batch settings (optional)
- ssl: Enable SSL/TLS (default: false)
- groupId: Consumer group id (required for ConsumerGroup)
- sessionTimeout: Timeout for group session
- protocol: Partition assignment protocol (array; e.g., ['roundrobin'])
- encoding: Message encoding (default: 'utf8')
- fromOffset: Start offset for new groups ('latest', 'earliest', or 'none')
- commitOffsetsOnFirstJoin: Whether to commit offsets when joining a group for the first time
- outOfRangeOffset: How to handle OutOfRangeOffset errors (same options as fromOffset)
- onRebalance: Optional callback for rebalance events
Consumer options (from official kafka-node documentation and source[2]):
- host: Zookeeper host (deprecated in favor of kafkaHost)
- kafkaHost: Kafka broker address
- groupId: Not supported in basic Consumer
- id: Consumer id
- autoCommit: Whether to auto-commit offsets (default: true)
- autoCommitIntervalMs: Interval for auto-commit (default: 5000)
- fetchMaxWaitMs: Maximum wait time for fetch
- fetchMinBytes: Minimum bytes to fetch
- fetchMaxBytes: Maximum bytes to fetch
- fromOffset: Can be used in payloads, but as an option, only relevant for payload setup, not as a global consumer setting
- encoding: Message encoding (default: 'utf8')
Summary Table
| Option | Consumer | ConsumerGroup |
|---|---|---|
| groupId | Not supported | Supported |
| fromOffset | Payload-level only | Supported (global option) |
ConsumerGroup should be used whenever you need to consume messages as part of a group (with rebalancing, partition assignment, and offset management for multiple consumers using a groupId)[7][2]. Consumer is for more direct, low-level, single-client consumption without coordinated offset management and group features.
Key References:
- ConsumerGroup: supports both groupId and fromOffset options[7].
- Consumer: does not support groupId (no group coordination), and fromOffset is only relevant as a per-payload property, not a consumer-wide option[2].
Citations:
- 1: https://www.npmjs.com/package/no-kafka
- 2: https://www.npmjs.com/package/kafka-node
- 3: https://www.redpanda.com/guides/kafka-architecture-kafka-consumer-group
- 4: https://docs.confluent.io/platform/current/clients/consumer.html
- 5: https://docs.confluent.io/kafka-clients/javascript/current/overview.html
- 6: https://docs.nestjs.com/microservices/kafka
- 7: https://github.com/SOHU-Co/kafka-node
- 8: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
- 9: https://kafka.js.org/docs/consuming
- 10: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
Use ConsumerGroup for proper group semantics
The kafka.Consumer constructor does not recognize groupId (it’s ignored) and only supports fromOffset on a per-payload basis. To enable true consumer-group coordination (rebalancing, offset management, etc.), switch to kafka.ConsumerGroup.
Please update in health-check/services/kafka.js at lines 88–92:
- Remove the plain
Consumercall with agroupId/fromOffsetoptions object. - Replace with a
ConsumerGroupinstantiation:
- const consumer = new kafka.Consumer(
- client,
- [{ topic: uniqueTopicName, partition: 0 }],
- {
- groupId: uniqueGroupId,
- fromOffset: false,
- }
- );
+ const consumer = new kafka.ConsumerGroup(
+ {
+ kafkaHost: kafkaUrl,
+ groupId: uniqueGroupId,
+ fromOffset: 'latest', // or 'earliest' if you need to replay messages
+ },
+ [uniqueTopicName]
+ );This ensures groupId and global fromOffset are applied correctly.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| [{ topic: uniqueTopicName, partition: 0 }], | |
| { | |
| autoCommit: true, | |
| groupId: uniqueGroupId, | |
| fromOffset: false, | |
| } | |
| // Replace the old Consumer instantiation: | |
| - const consumer = new kafka.Consumer( | |
| - client, | |
| - [{ topic: uniqueTopicName, partition: 0 }], | |
| - { | |
| - groupId: uniqueGroupId, | |
| - fromOffset: false, | |
| - } | |
| - ); | |
| const consumer = new kafka.ConsumerGroup( | |
| { | |
| kafkaHost: kafkaUrl, | |
| groupId: uniqueGroupId, | |
| fromOffset: 'latest', // or 'earliest' if you need to replay messages | |
| }, | |
| [uniqueTopicName] | |
| ); |
🤖 Prompt for AI Agents
In health-check/services/kafka.js around lines 88–92, the code uses new
kafka.Consumer with a groupId/fromOffset options object which is ignored;
replace that with a kafka.ConsumerGroup instantiation. Remove the existing
kafka.Consumer call and instead create a new kafka.ConsumerGroup passing a
proper options object (including kafkaHost/bootstrapServers, groupId, and
fromOffset as a global setting) and the topic(s) (either as an array of topic
names or payloads); this will ensure groupId and fromOffset are applied
correctly and enable proper consumer-group semantics (rebalancing and offset
management).
Summary by CodeRabbit