Add Kafka message metadata fields (offset, partition, topic) to batch processing for deduplication support#141
Conversation
| return self | ||
|
|
||
| def write(self, bs): | ||
| def write(self, bs, offset=None, partition=None, topic=None): |
There was a problem hiding this comment.
I think this is great for right now, and love that you have concrete use cases for this!
We may have to rethink this in a future version. Kafka is, by far, the most popular source, but the dream is to have other sources available as well.
I think this is great, just commenting for future!
There was a problem hiding this comment.
Pull Request Overview
This PR adds Kafka message metadata fields (offset, partition, topic) to the Message class and handlers to enable idempotent consumer patterns and deduplication logic. The changes expose Kafka-specific metadata that can be used to track processed messages and avoid reprocessing.
- Extended Message class to include topic, partition, and offset metadata fields
- Updated Kafka source to populate new metadata fields when creating Message objects
- Modified pipeline and handlers to pass and utilize Kafka metadata for deduplication
- Added example configuration demonstrating idempotent consumption patterns
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| sqlflow/sources/base.py | Extended Message class constructor and added getter methods for Kafka metadata |
| sqlflow/sources/kafka.py | Updated Kafka message creation to include topic, partition, and offset metadata |
| sqlflow/pipeline.py | Modified message processing to pass Kafka metadata to handlers |
| sqlflow/handlers.py | Enhanced InferredMemBatch handler to inject Kafka metadata into decoded objects |
| dev/config/examples/kafka.motherduck.idempotent.yml | Added example configuration for idempotent Kafka consumption with deduplication |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
|
|
||
| class Message: | ||
| def __init__(self, value: bytes): | ||
| def __init__(self, value: bytes, topic: str | None, partition: int | None, offset: int | None): |
There was a problem hiding this comment.
The constructor signature introduces a breaking change by making previously optional parameters required. Consider using keyword-only arguments or providing default values (None) to maintain backward compatibility with existing Message instantiations.
| def __init__(self, value: bytes, topic: str | None, partition: int | None, offset: int | None): | |
| def __init__(self, value: bytes, topic: str | None = None, partition: int | None = None, offset: int | None = None): |
There was a problem hiding this comment.
Yes! I need to figure out why CI didn't run on a fork! 👀
| SELECT | ||
| kafka_partition AS partition, | ||
| MAX(kafka_offset) AS "offset", | ||
| kafka_topic AS topic, |
There was a problem hiding this comment.
Missing comma after 'kafka_topic AS topic' in the SELECT statement. This will cause a SQL syntax error.
| kafka_topic AS topic, | |
| kafka_topic AS topic |
There was a problem hiding this comment.
ooo
https://duckdb.org/docs/stable/sql/dialect/friendly_sql.html
<3 I had no idea that duckdb does this!
This PR exposes additional Kafka message metadata fields in the InferredMemBatch handler to enable idempotent consumer patterns and deduplication logic, addressing issue #140.