Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions dev/config/examples/kafka.motherduck.idempotent.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
commands:
- name: attach to motherduck
sql: |
ATTACH 'md:my_db'

- name: create events table
sql: |
CREATE TABLE IF NOT EXISTS my_db.events (
ip VARCHAR,
event VARCHAR,
properties_city VARCHAR,
properties_country VARCHAR,
timestamp TIMESTAMP,
type VARCHAR,
userId VARCHAR
)

- name: create events metadata table
sql: |
CREATE TABLE IF NOT EXISTS my_db.events_metadata (
partition INTEGER,
"offset" BIGINT,
topic VARCHAR,
updated_at TIMESTAMP DEFAULT now(),
PRIMARY KEY (topic, partition)
)
pipeline:
name: kafka-motherduck-sink
description: "Sinks data from kafka to motherduck"
batch_size: {{ SQLFLOW_BATCH_SIZE|default(100000) }}

source:
type: kafka
kafka:
brokers: [{{ SQLFLOW_KAFKA_BROKERS|default('localhost:9092') }}]
group_id: motherduck-sink
auto_offset_reset: earliest
topics:
- "input-user-clicks-motherduck"

handler:
type: "handlers.InferredMemBatch"
sql: |
BEGIN TRANSACTION;

CREATE OR REPLACE TEMPORARY TABLE filtered_batch AS
SELECT b.*
FROM batch b
LEFT JOIN my_db.events_metadata em
ON b.kafka_topic = em.topic
AND b.kafka_partition = em.partition
WHERE em."offset" IS NULL
OR b.kafka_offset > em."offset";

INSERT INTO my_db.events
SELECT
ip,
event,
properties ->> 'city' AS properties_city,
properties ->> 'country' AS properties_country,
CAST(timestamp AS TIMESTAMP) AS timestamp,
type,
userId
FROM filtered_batch;

INSERT INTO my_db.events_metadata
(partition, "offset", topic)
SELECT
kafka_partition AS partition,
MAX(kafka_offset) AS "offset",
kafka_topic AS topic,
Copy link

Copilot AI Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing comma after 'kafka_topic AS topic' in the SELECT statement. This will cause a SQL syntax error.

Suggested change
kafka_topic AS topic,
kafka_topic AS topic

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Owner

@turbolytics turbolytics Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooo

https://duckdb.org/docs/stable/sql/dialect/friendly_sql.html

<3 I had no idea that duckdb does this!

FROM filtered_batch
WHERE kafka_offset IS NOT NULL
GROUP BY kafka_partition, kafka_topic
ON CONFLICT (topic, partition)
DO UPDATE SET
"offset" = EXCLUDED."offset",
updated_at = now();

COMMIT;

sink:
type: noop
8 changes: 7 additions & 1 deletion sqlflow/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,14 @@ def init(self):
self.rows = []
return self

def write(self, bs):
def write(self, bs, offset=None, partition=None, topic=None):
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is great for right now, and love that you have concrete use cases for this!

We may have to rethink this in a future version. Kafka is, by far, the most popular source, but the dream is to have other sources available as well.

I think this is great, just commenting for future!

o = self.deserializer.decode(bs)
if offset is not None:
o['kafka_offset'] = offset
if partition is not None:
o['kafka_partition'] = partition
if topic is not None:
o['kafka_topic'] = topic
self.rows.append(o)

def invoke(self) -> Optional[pa.Table]:
Expand Down
7 changes: 6 additions & 1 deletion sqlflow/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,12 @@ def _consume_loop(self, max_msgs=None):

try:
msgObj = msg.value().decode()
self.handler.write(msgObj)
self.handler.write(
msgObj,
offset=msg.offset(),
partition=msg.partition(),
topic=msg.topic()
)
except Exception as e:
self._stats.num_errors += 1
error_counter.add(
Expand Down
13 changes: 12 additions & 1 deletion sqlflow/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,23 @@
logger = logging.getLogger(__name__)

class Message:
def __init__(self, value: bytes):
def __init__(self, value: bytes, topic: str | None, partition: int | None, offset: int | None):
Copy link

Copilot AI Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor signature introduces a breaking change by making previously optional parameters required. Consider using keyword-only arguments or providing default values (None) to maintain backward compatibility with existing Message instantiations.

Suggested change
def __init__(self, value: bytes, topic: str | None, partition: int | None, offset: int | None):
def __init__(self, value: bytes, topic: str | None = None, partition: int | None = None, offset: int | None = None):

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I need to figure out why CI didn't run on a fork! 👀

self._value = value
self._topic = topic
self._partition = partition
self._offset = offset

def value(self) -> bytes:
return self._value

def topic(self) -> str | None:
return self._topic

def partition(self) -> int | None:
return self._partition

def offset(self) -> int | None:
return self._offset

class Source(ABC):
"""
Expand Down
5 changes: 4 additions & 1 deletion sqlflow/sources/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ def stream(self) -> typing.Iterable[Message | None]:
if msg is None:
yield None
elif not msg.error():
yield Message(msg.value())
yield Message(value=msg.value(),
topic=msg.topic(),
partition=msg.partition(),
offset=msg.offset())
elif msg.error().code() == KafkaError._PARTITION_EOF:
logger.warning(
'%% %s [%d] reached end at offset %d'.format(
Expand Down