Skip to content

kafka-topic-change#341

Merged
VISHNUDAS-tunerlabs merged 2 commits intomasterfrom
KafkaFix
Aug 19, 2025
Merged

kafka-topic-change#341
VISHNUDAS-tunerlabs merged 2 commits intomasterfrom
KafkaFix

Conversation

@MallanagoudaB
Copy link
Copy Markdown
Collaborator

@MallanagoudaB MallanagoudaB commented Aug 19, 2025

Summary by CodeRabbit

  • New Features
    • Kafka health check now supports a configurable topic and can be targeted via configuration.
  • Bug Fixes
    • Health check configuration validation now requires both Kafka URL and topic when Kafka is enabled.
  • Chores
    • Package version updated to 0.0.4.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Aug 19, 2025

Walkthrough

Kafka health check now requires both a URL and a topic; index.js passes both values. Kafka service signature changed to accept a dynamic topic and uses it throughout. Added Kafka validation in config. Package version bumped 0.0.3 → 0.0.4 and minor formatting/style tweaks elsewhere.

Changes

Cohort / File(s) Summary
Handler wiring
health-check/index.js
Updated to call kafka.check(config.checks.kafka.url, config.checks.kafka.topic); formatting adjusted for bullmq call and removed an extra blank line after formatResponse(result).
Kafka service (dynamic topic)
health-check/services/kafka.js
Changed check(kafkaUrl)check(kafkaUrl, topicName). Replaced static topic constant with topicName in ensureTopicExists, producer payloads, and consumer subscription; behavior and error handling preserved.
Validation
health-check/index.js (validateHealthConfig)
Added validation requiring both config.checks.kafka.url and config.checks.kafka.topic when kafka.enabled is true.
Package metadata
health-check/package.json
Version updated from 0.0.3 to 0.0.4. No dependency or API export changes.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Client
  participant HealthCheck as health-check/index.js
  participant KafkaSvc as services/kafka.js
  participant KafkaCluster as Kafka

  Client->>HealthCheck: GET /health
  HealthCheck->>KafkaSvc: check(kafkaUrl, topicName)
  KafkaSvc->>KafkaCluster: connect
  KafkaSvc->>KafkaCluster: ensureTopicExists(topicName)
  KafkaSvc->>KafkaCluster: produce(test message → topicName)
  KafkaCluster-->>KafkaSvc: message consumed (or timeout)
  KafkaSvc->>KafkaCluster: cleanup (disconnect)
  KafkaSvc-->>HealthCheck: result (healthy/unhealthy)
  HealthCheck-->>Client: HTTP 200/503 with details

  rect rgba(230,245,255,0.5)
  note over HealthCheck,KafkaSvc: Changed: topicName provided dynamically to KafkaSvc.check
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

I twitch my whiskers, tap the key,
"Send the topic too," says little me.
Messages bound on nimble feet,
I wait, I listen—timely, sweet.
Version nudged, the checks all sing; hooray, hop on, systems neat! 🐇✨

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between c92876a and b959b06.

📒 Files selected for processing (1)
  • health-check/index.js (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • health-check/index.js
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch KafkaFix

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1> [!CAUTION]

Some comments are outside the diff and can’t be posted inline due to platform limits.

🔭 Outside diff range comments (1)
health-check/services/kafka.js (1)

49-127: Flaky flow: producing before consuming risks missing the message. Add input validation and single-settle guard; create the consumer before sending.

Current order creates the consumer after the message is produced, so with fromOffset: false the consumer can miss the produced message. Also, there’s no guard against multiple resolves, and the Promise executor is marked async unnecessarily. The patch below addresses all three:

  • Validate topicName upfront.
  • Remove async from the Promise executor.
  • Create the consumer before sending to ensure it sees the message.
  • Add a single-settle guard and consistent cleanup.
-async function check(kafkaUrl,topicName) {
-	return new Promise(async (resolve) => {
+function check(kafkaUrl, topicName) {
+	return new Promise((resolve) => {
-		console.log(`[Kafka Health Check] Connecting to Kafka at ${kafkaUrl}`);
-		const client = new kafka.KafkaClient({ kafkaHost: kafkaUrl });
+		if (!topicName || typeof topicName !== 'string' || topicName.trim().length === 0) {
+			console.error('[Kafka Health Check] Missing `topicName`');
+			return resolve(false);
+		}
+
+		console.log(`[Kafka Health Check] Connecting to Kafka at ${kafkaUrl}`);
+		const client = new kafka.KafkaClient({ kafkaHost: kafkaUrl });
 
-		try {
-			await ensureTopicExists(client, topicName);
-		} catch (err) {
-			client.close();
-			return resolve(false);
-		}
+		ensureTopicExists(client, topicName).then(() => {
+			const messageId = `health-check-${uuidv4()}`;
+			const payloads = [{ topic: topicName, messages: messageId }];
+
+			const producer = new kafka.Producer(client);
+			let settled = false;
+			const settle = (ok) => {
+				if (settled) return;
+				settled = true;
+				clearTimeout(timeout);
+				consumer.close(true, () => {
+					// Close producer if available, then client
+					try {
+						producer.close(() => client.close());
+					} catch (_) {
+						client.close();
+					}
+					resolve(ok);
+				});
+			};
 
-		const messageId = `health-check-${uuidv4()}`;
-		const payloads = [
-			{
-				topic: topicName,
-				messages: messageId,
-			},
-		];
+			const consumer = new kafka.Consumer(
+				client,
+				[{ topic: topicName, partition: 0 }],
+				{ autoCommit: true, fromOffset: false }
+			);
 
-		const producer = new kafka.Producer(client);
+			const timeout = setTimeout(() => {
+				console.error('[Kafka Health Check] Timed out waiting for message');
+				settle(false);
+			}, 10000);
 
-		producer.on('ready', () => {
-			console.log(`[Kafka Health Check] Producer ready. Sending message: ${messageId}`);
-
-			producer.send(payloads, (err, data) => {
-				if (err) {
-					console.error('[Kafka Health Check] Error sending message:', err);
-					client.close();
-					return resolve(false);
-				}
-
-				console.log('[Kafka Health Check] Message sent:', data);
-
-				const consumer = new kafka.Consumer(
-					client,
-					[{ topic: topicName, partition: 0 }],
-					{
-						autoCommit: true,
-						fromOffset: false,
-					}
-				);
-
-				const timeout = setTimeout(() => {
-					console.error('[Kafka Health Check] Timed out waiting for message');
-					consumer.close(true, () => {
-						client.close();
-						resolve(false);
-					});
-				}, 10000); 
-
-				consumer.on('message', (message) => {
-					console.log('[Kafka Health Check] Received message:', message.value);
-					if (message.value === messageId) {
-						clearTimeout(timeout);
-						consumer.close(true, () => {
-							client.close();
-							resolve(true);
-						});
-					}
-				});
-
-				consumer.on('error', (err) => {
-					console.error('[Kafka Health Check] Consumer error:', err);
-					clearTimeout(timeout);
-					consumer.close(true, () => {
-						client.close();
-						resolve(false);
-					});
-				});
-			});
-		});
+			consumer.on('message', (message) => {
+				console.log('[Kafka Health Check] Received message:', message.value);
+				if (message.value === messageId) {
+					settle(true);
+				}
+			});
+
+			consumer.on('error', (err) => {
+				console.error('[Kafka Health Check] Consumer error:', err);
+				settle(false);
+			});
+
+			producer.on('ready', () => {
+				console.log(`[Kafka Health Check] Producer ready. Sending message: ${messageId}`);
+				producer.send(payloads, (err, data) => {
+					if (err) {
+						console.error('[Kafka Health Check] Error sending message:', err);
+						return settle(false);
+					}
+					console.log('[Kafka Health Check] Message sent:', data);
+				});
+			});
+
+			producer.on('error', (err) => {
+				console.error('[Kafka Health Check] Producer error:', err);
+				settle(false);
+			});
+		}).catch((err) => {
+			client.close();
+			return resolve(false);
+		});
 
-		producer.on('error', (err) => {
-			console.error('[Kafka Health Check] Producer error:', err);
-			client.close();
-			return resolve(false);
-		});
 	});
 }
🧹 Nitpick comments (2)
health-check/package.json (1)

11-19: Consider removing duplicate Kafka clients to reduce footprint.

Both kafka-node and kafkajs are listed as dependencies, but this package (health-check) exclusively uses kafka-node. If kafkajs isn’t used anywhere in this package, consider removing it to shrink install size and reduce maintenance.

health-check/services/kafka.js (1)

11-47: Optional: reduce metadata scope when checking topics.

Using loadMetadataForTopics([]) fetches metadata for all topics. If broker ACLs and topic counts are large, consider requesting metadata only for the target topic (e.g., loadMetadataForTopics([topicName])) and handling the “unknown topic” case to decide on creation. This reduces payload and latency.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between c7fdf2b and c92876a.

📒 Files selected for processing (3)
  • health-check/index.js (1 hunks)
  • health-check/package.json (1 hunks)
  • health-check/services/kafka.js (3 hunks)
🔇 Additional comments (2)
health-check/package.json (1)

3-3: Version bump looks good.

Package version updated to 0.0.4 — no issues.

health-check/index.js (1)

38-38: All kafka.check invocations updated
Verified that the sole call site in health-check/index.js (line 38) passes both url and topic; no other references exist.

@MallanagoudaB
Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Aug 19, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@VISHNUDAS-tunerlabs VISHNUDAS-tunerlabs merged commit 48c7bd8 into master Aug 19, 2025
1 check passed
@coderabbitai coderabbitai bot mentioned this pull request Aug 20, 2025
This was referenced Oct 25, 2025
@coderabbitai coderabbitai bot mentioned this pull request Mar 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants