diff --git a/dev/config/examples/kafka.motherduck.idempotent.yml b/dev/config/examples/kafka.motherduck.idempotent.yml new file mode 100644 index 0000000..0eab871 --- /dev/null +++ b/dev/config/examples/kafka.motherduck.idempotent.yml @@ -0,0 +1,83 @@ +commands: + - name: attach to motherduck + sql: | + ATTACH 'md:my_db' + + - name: create events table + sql: | + CREATE TABLE IF NOT EXISTS my_db.events ( + ip VARCHAR, + event VARCHAR, + properties_city VARCHAR, + properties_country VARCHAR, + timestamp TIMESTAMP, + type VARCHAR, + userId VARCHAR + ) + + - name: create events metadata table + sql: | + CREATE TABLE IF NOT EXISTS my_db.events_metadata ( + partition INTEGER, + "offset" BIGINT, + topic VARCHAR, + updated_at TIMESTAMP DEFAULT now(), + PRIMARY KEY (topic, partition) + ) +pipeline: + name: kafka-motherduck-sink + description: "Sinks data from kafka to motherduck" + batch_size: {{ SQLFLOW_BATCH_SIZE|default(100000) }} + + source: + type: kafka + kafka: + brokers: [{{ SQLFLOW_KAFKA_BROKERS|default('localhost:9092') }}] + group_id: motherduck-sink + auto_offset_reset: earliest + topics: + - "input-user-clicks-motherduck" + + handler: + type: "handlers.InferredMemBatch" + sql: | + BEGIN TRANSACTION; + + CREATE OR REPLACE TEMPORARY TABLE filtered_batch AS + SELECT b.* + FROM batch b + LEFT JOIN my_db.events_metadata em + ON b.kafka_topic = em.topic + AND b.kafka_partition = em.partition + WHERE em."offset" IS NULL + OR b.kafka_offset > em."offset"; + + INSERT INTO my_db.events + SELECT + ip, + event, + properties ->> 'city' AS properties_city, + properties ->> 'country' AS properties_country, + CAST(timestamp AS TIMESTAMP) AS timestamp, + type, + userId + FROM filtered_batch; + + INSERT INTO my_db.events_metadata + (partition, "offset", topic) + SELECT + kafka_partition AS partition, + MAX(kafka_offset) AS "offset", + kafka_topic AS topic, + FROM filtered_batch + WHERE kafka_offset IS NOT NULL + GROUP BY kafka_partition, kafka_topic + ON CONFLICT (topic, partition) + DO UPDATE SET + "offset" = EXCLUDED."offset", + updated_at = now(); + + COMMIT; + + sink: + type: noop diff --git a/sqlflow/handlers.py b/sqlflow/handlers.py index 8843952..1297ec8 100644 --- a/sqlflow/handlers.py +++ b/sqlflow/handlers.py @@ -101,8 +101,14 @@ def init(self): self.rows = [] return self - def write(self, bs): + def write(self, bs, offset=None, partition=None, topic=None): o = self.deserializer.decode(bs) + if offset is not None: + o['kafka_offset'] = offset + if partition is not None: + o['kafka_partition'] = partition + if topic is not None: + o['kafka_topic'] = topic self.rows.append(o) def invoke(self) -> Optional[pa.Table]: diff --git a/sqlflow/pipeline.py b/sqlflow/pipeline.py index ab92703..7ca8485 100644 --- a/sqlflow/pipeline.py +++ b/sqlflow/pipeline.py @@ -204,7 +204,12 @@ def _consume_loop(self, max_msgs=None): try: msgObj = msg.value().decode() - self.handler.write(msgObj) + self.handler.write( + msgObj, + offset=msg.offset(), + partition=msg.partition(), + topic=msg.topic() + ) except Exception as e: self._stats.num_errors += 1 error_counter.add( diff --git a/sqlflow/sources/base.py b/sqlflow/sources/base.py index b85df52..b6eee92 100644 --- a/sqlflow/sources/base.py +++ b/sqlflow/sources/base.py @@ -6,12 +6,23 @@ logger = logging.getLogger(__name__) class Message: - def __init__(self, value: bytes): + def __init__(self, value: bytes, topic: str | None, partition: int | None, offset: int | None): self._value = value + self._topic = topic + self._partition = partition + self._offset = offset def value(self) -> bytes: return self._value + def topic(self) -> str | None: + return self._topic + + def partition(self) -> int | None: + return self._partition + + def offset(self) -> int | None: + return self._offset class Source(ABC): """ diff --git a/sqlflow/sources/kafka.py b/sqlflow/sources/kafka.py index e394944..11ff212 100644 --- a/sqlflow/sources/kafka.py +++ b/sqlflow/sources/kafka.py @@ -35,7 +35,10 @@ def stream(self) -> typing.Iterable[Message | None]: if msg is None: yield None elif not msg.error(): - yield Message(msg.value()) + yield Message(value=msg.value(), + topic=msg.topic(), + partition=msg.partition(), + offset=msg.offset()) elif msg.error().code() == KafkaError._PARTITION_EOF: logger.warning( '%% %s [%d] reached end at offset %d'.format(