Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
2026-04-09T23:03:23.221455Z INFO sift_stream::stream::builder::config_loader: created new ingestion config ingestion_config_id="c4a41a08-9a97-4c6d-a3d7-ceca70f53914" flow_names="vehicle_metrics"
2026-04-09T23:03:23.374361Z INFO sift_stream::stream::run: created new run run_id="8df2e922-ee43-4605-8f40-12e7216ca11c" run_name="robot_vehicle_20260409T230322_6bc64e03_run"
2026-04-09T23:03:23.377097Z INFO sift_stream::stream::tasks: backup manager task started sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02
2026-04-09T23:03:23.377405Z INFO sift_stream::stream::tasks: ingestion task started sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02
2026-04-09T23:03:23.377428Z INFO sift_stream::stream::tasks: creating new stream sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02
2026-04-09T23:03:23.377439Z INFO sift_stream::stream::tasks: successfully initialized a new stream to Sift sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02
2026-04-09T23:03:23.378133Z INFO sift_stream::stream::tasks: backup re-ingestion task started sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02
2026-04-09T23:03:23.979642Z INFO sift_stream::stream::builder::config_loader: created new ingestion config ingestion_config_id="7e7b9616-ac1f-4f9f-88f8-cab8b9131a08" flow_names="sift-stream-metrics-flow"
2026-04-09T23:03:23.980268Z INFO sift_stream::stream::tasks: backup manager task started sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e
2026-04-09T23:03:23.980347Z INFO sift_stream::stream::tasks: ingestion task started sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e
2026-04-09T23:03:23.980364Z INFO sift_stream::stream::tasks: creating new stream sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e
2026-04-09T23:03:23.980380Z INFO sift_stream::stream::tasks: successfully initialized a new stream to Sift sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e
2026-04-09T23:03:23.980398Z INFO sift_stream::stream::tasks: Sift streaming successfully initialized sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e
2026-04-09T23:03:23.980473Z INFO sift_stream::stream::tasks: Sift streaming successfully initialized sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02
2026-04-09T23:03:23.980516Z INFO sift_stream::stream::tasks: backup re-ingestion task started sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e
2026-04-09T23:03:23.980725Z INFO sift_stream::stream::tasks: metrics streaming task started sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02
2026-04-09T23:03:23.981724Z INFO sift_stream::stream::mode::ingestion_config: flow 'vehicle_metrics' being ingested for the first time sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02
2026-04-09T23:03:23.982611Z INFO sift_stream::stream::mode::ingestion_config: flow 'sift-stream-metrics-flow' being ingested for the first time sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e
2026-04-09T23:03:46.705518Z INFO sift_stream::stream::tasks: metrics streaming task shutting down
2026-04-09T23:03:46.705536Z INFO sift_stream::backup::disk::async_manager: backup manager shutting down
2026-04-09T23:03:46.705544Z INFO sift_stream::backup::disk::async_manager: backup manager cleanup started
2026-04-09T23:03:46.705566Z INFO sift_stream::backup::disk::async_manager: re-ingestion task shutting down
2026-04-09T23:03:46.705600Z INFO sift_stream::backup::disk::async_manager: re-ingestion task shutting down
2026-04-09T23:03:46.705920Z INFO sift_stream::stream::tasks: ingestion task shutting down sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e
2026-04-09T23:03:46.705932Z INFO sift_stream::backup::disk::async_manager: backup manager shutting down
2026-04-09T23:03:46.705936Z INFO sift_stream::backup::disk::async_manager: backup manager cleanup started
2026-04-09T23:03:46.705963Z INFO sift_stream::stream::tasks: ingestion task shutting down sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02
2026-04-09T23:03:46.738349Z INFO sift_stream::stream::tasks: final stream completed successfully sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02
2026-04-09T23:03:46.738359Z INFO sift_stream::stream::tasks: final stream completed successfully sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e
2026-04-09T23:03:46.746265Z INFO sift_stream::backup::disk::async_manager: no backup files need to be re-ingested
2026-04-09T23:03:46.746347Z INFO sift_stream::stream::mode::ingestion_config: successfully shutdown streaming system sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e
2026-04-09T23:03:46.746379Z INFO sift_stream::stream::mode::ingestion_config: successfully shutdown streaming system sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02
2026-04-09T23:08:34.739983Z INFO sift_stream::stream::builder::config_loader: created new ingestion config ingestion_config_id="d8a9512e-983f-4d33-9073-554371acb452" flow_names="vehicle_metrics"
2026-04-09T23:08:34.888672Z INFO sift_stream::stream::run: created new run run_id="62f79930-0916-4ddf-ba04-80877e1d7332" run_name="robot_vehicle_20260409T230833_8345f9cf_run"
2026-04-09T23:08:34.891757Z INFO sift_stream::stream::tasks: backup manager task started sift_stream_id=6b985fea-312a-4577-9845-59390e02574d
2026-04-09T23:08:34.891972Z INFO sift_stream::stream::tasks: ingestion task started sift_stream_id=6b985fea-312a-4577-9845-59390e02574d
2026-04-09T23:08:34.891987Z INFO sift_stream::stream::tasks: creating new stream sift_stream_id=6b985fea-312a-4577-9845-59390e02574d
2026-04-09T23:08:34.891996Z INFO sift_stream::stream::tasks: successfully initialized a new stream to Sift sift_stream_id=6b985fea-312a-4577-9845-59390e02574d
2026-04-09T23:08:34.892478Z INFO sift_stream::stream::tasks: backup re-ingestion task started sift_stream_id=6b985fea-312a-4577-9845-59390e02574d
2026-04-09T23:08:35.399524Z INFO sift_stream::stream::builder::config_loader: created new ingestion config ingestion_config_id="ea9b4507-d1f1-4786-be16-e14445c90a30" flow_names="sift-stream-metrics-flow"
2026-04-09T23:08:35.400015Z INFO sift_stream::stream::tasks: Sift streaming successfully initialized sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741
2026-04-09T23:08:35.400038Z INFO sift_stream::stream::tasks: ingestion task started sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741
2026-04-09T23:08:35.400059Z INFO sift_stream::stream::tasks: creating new stream sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741
2026-04-09T23:08:35.400078Z INFO sift_stream::stream::tasks: successfully initialized a new stream to Sift sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741
2026-04-09T23:08:35.400087Z INFO sift_stream::stream::tasks: Sift streaming successfully initialized sift_stream_id=6b985fea-312a-4577-9845-59390e02574d
2026-04-09T23:08:35.400107Z INFO sift_stream::stream::tasks: backup re-ingestion task started sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741
2026-04-09T23:08:35.400154Z INFO sift_stream::stream::tasks: backup manager task started sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741
2026-04-09T23:08:35.400869Z INFO sift_stream::stream::tasks: metrics streaming task started sift_stream_id=6b985fea-312a-4577-9845-59390e02574d
2026-04-09T23:08:35.401121Z INFO sift_stream::stream::mode::ingestion_config: flow 'vehicle_metrics' being ingested for the first time sift_stream_id=6b985fea-312a-4577-9845-59390e02574d
2026-04-09T23:08:35.402720Z INFO sift_stream::stream::mode::ingestion_config: flow 'sift-stream-metrics-flow' being ingested for the first time sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741
2026-04-09T23:08:41.290497Z INFO sift_stream::backup::disk::async_manager: backup manager shutting down
2026-04-09T23:08:41.290531Z INFO sift_stream::stream::tasks: ingestion task shutting down sift_stream_id=6b985fea-312a-4577-9845-59390e02574d
2026-04-09T23:08:41.290573Z INFO sift_stream::backup::disk::async_manager: re-ingestion task shutting down
2026-04-09T23:08:41.290666Z INFO sift_stream::backup::disk::async_manager: backup manager cleanup started
2026-04-09T23:08:41.290695Z INFO sift_stream::stream::tasks: metrics streaming task shutting down
2026-04-09T23:08:41.290755Z INFO sift_stream::backup::disk::async_manager: backup manager shutting down
2026-04-09T23:08:41.290771Z INFO sift_stream::stream::tasks: ingestion task shutting down sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741
2026-04-09T23:08:41.290887Z INFO sift_stream::backup::disk::async_manager: re-ingestion task shutting down
2026-04-09T23:08:41.290936Z INFO sift_stream::backup::disk::async_manager: backup manager cleanup started
2026-04-09T23:08:41.325415Z INFO sift_stream::stream::tasks: final stream completed successfully sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741
2026-04-09T23:08:41.325437Z INFO sift_stream::stream::tasks: final stream completed successfully sift_stream_id=6b985fea-312a-4577-9845-59390e02574d
2026-04-09T23:08:41.332876Z INFO sift_stream::backup::disk::async_manager: no backup files need to be re-ingested
2026-04-09T23:08:41.332919Z INFO sift_stream::stream::mode::ingestion_config: successfully shutdown streaming system sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741
2026-04-09T23:08:41.332953Z INFO sift_stream::stream::mode::ingestion_config: successfully shutdown streaming system sift_stream_id=6b985fea-312a-4577-9845-59390e02574d
2026-04-09T23:11:49.333957Z INFO sift_stream::stream::builder::config_loader: created new ingestion config ingestion_config_id="552c88f3-a459-432a-b9d8-08218ff66e01" flow_names="vehicle_metrics"
2026-04-09T23:11:49.438671Z INFO sift_stream::stream::run: created new run run_id="b33d1236-14c6-4ea7-8a1c-f419dee7d338" run_name="robot_vehicle_20260409T231148_993c090c_run"
2026-04-09T23:11:49.441563Z INFO sift_stream::stream::tasks: backup manager task started sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4
2026-04-09T23:11:49.441913Z INFO sift_stream::stream::tasks: ingestion task started sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4
2026-04-09T23:11:49.441928Z INFO sift_stream::stream::tasks: creating new stream sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4
2026-04-09T23:11:49.441938Z INFO sift_stream::stream::tasks: successfully initialized a new stream to Sift sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4
2026-04-09T23:11:49.442231Z INFO sift_stream::stream::tasks: backup re-ingestion task started sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4
2026-04-09T23:11:49.938519Z INFO sift_stream::stream::builder::config_loader: created new ingestion config ingestion_config_id="288d15a5-28f1-4002-8747-992ec9bf93bb" flow_names="sift-stream-metrics-flow"
2026-04-09T23:11:49.939053Z INFO sift_stream::stream::tasks: backup manager task started sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96
2026-04-09T23:11:49.939108Z INFO sift_stream::stream::tasks: Sift streaming successfully initialized sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96
2026-04-09T23:11:49.939146Z INFO sift_stream::stream::tasks: ingestion task started sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96
2026-04-09T23:11:49.939168Z INFO sift_stream::stream::tasks: creating new stream sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96
2026-04-09T23:11:49.939183Z INFO sift_stream::stream::tasks: Sift streaming successfully initialized sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4
2026-04-09T23:11:49.939193Z INFO sift_stream::stream::tasks: successfully initialized a new stream to Sift sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96
2026-04-09T23:11:49.939278Z INFO sift_stream::stream::tasks: backup re-ingestion task started sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96
2026-04-09T23:11:49.939460Z INFO sift_stream::stream::tasks: metrics streaming task started sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4
2026-04-09T23:11:49.940313Z INFO sift_stream::stream::mode::ingestion_config: flow 'vehicle_metrics' being ingested for the first time sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4
2026-04-09T23:11:49.941057Z INFO sift_stream::stream::mode::ingestion_config: flow 'sift-stream-metrics-flow' being ingested for the first time sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96
2026-04-09T23:12:03.267990Z INFO sift_stream::stream::tasks: metrics streaming task shutting down
2026-04-09T23:12:03.268002Z INFO sift_stream::stream::tasks: ingestion task shutting down sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4
2026-04-09T23:12:03.268031Z INFO sift_stream::backup::disk::async_manager: re-ingestion task shutting down
2026-04-09T23:12:03.268056Z INFO sift_stream::backup::disk::async_manager: backup manager shutting down
2026-04-09T23:12:03.268060Z INFO sift_stream::backup::disk::async_manager: backup manager cleanup started
2026-04-09T23:12:03.268071Z INFO sift_stream::stream::tasks: ingestion task shutting down sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96
2026-04-09T23:12:03.268104Z INFO sift_stream::backup::disk::async_manager: re-ingestion task shutting down
2026-04-09T23:12:03.268142Z INFO sift_stream::backup::disk::async_manager: backup manager shutting down
2026-04-09T23:12:03.268408Z INFO sift_stream::backup::disk::async_manager: backup manager cleanup started
2026-04-09T23:12:03.300586Z INFO sift_stream::stream::tasks: final stream completed successfully sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96
2026-04-09T23:12:03.300597Z INFO sift_stream::stream::tasks: final stream completed successfully sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4
2026-04-09T23:12:03.305209Z INFO sift_stream::backup::disk::async_manager: no backup files need to be re-ingested
2026-04-09T23:12:03.305228Z INFO sift_stream::stream::mode::ingestion_config: successfully shutdown streaming system sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96
2026-04-09T23:12:03.305251Z INFO sift_stream::stream::mode::ingestion_config: successfully shutdown streaming system sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4
76 changes: 43 additions & 33 deletions python/examples/ingestion-tutorial/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,50 +178,60 @@ async def main() -> None:
ingestion_config=ingestion_config,
run=run,
) as ingest_client:
# Continue streaming until the user terminates the program
while True:
now = datetime.now(timezone.utc)

# Generate mock telemetry values
# ---------------------------------------------------------
# In a real system, these would come from sensors,
# hardware interfaces, or production metrics.
velocity = random.uniform(0, 10)
temperature = random.uniform(20, 40)

# Build and send a FlowPy directly using sift_stream_bindings types
# ---------------------------------------------------------
# Using FlowPy, ChannelValuePy, ValuePy, and TimeValuePy directly
# avoids the CPU-bound conversion overhead of the ergonomic
# flow_config.as_flow() helper.
await ingest_client.send(
FlowPy(
flow_name=FLOW_NAME,
timestamp=TimeValuePy.from_timestamp_millis(int(now.timestamp() * 1000)),
values=[
ChannelValuePy(name="velocity", value=ValuePy.Double(velocity)),
ChannelValuePy(name="temperature", value=ValuePy.Double(temperature)),
],
# Continue streaming until the user terminates the program.
# On Ctrl+C, Python's asyncio raises CancelledError (not
# KeyboardInterrupt) inside awaiting coroutines. Catching it
# here lets the async-with context manager exit cleanly and
# call finish() via __aexit__.
try:
while True:
now = datetime.now(timezone.utc)

# Generate mock telemetry values
# ---------------------------------------------------------
# In a real system, these would come from sensors,
# hardware interfaces, or production metrics.
velocity = random.uniform(0, 10)
temperature = random.uniform(20, 40)

# Build and send a FlowPy directly using sift_stream_bindings types
# ---------------------------------------------------------
# Using FlowPy, ChannelValuePy, ValuePy, and TimeValuePy directly
# avoids the CPU-bound conversion overhead of the ergonomic
# flow_config.as_flow() helper.
await ingest_client.send(
FlowPy(
flow_name=FLOW_NAME,
timestamp=TimeValuePy.from_timestamp_millis(int(now.timestamp() * 1000)),
values=[
ChannelValuePy(name="velocity", value=ValuePy.Double(velocity)),
ChannelValuePy(name="temperature", value=ValuePy.Double(temperature)),
],
)
)
)

print(
f"[SENT {now.isoformat()}] "
f"velocity={velocity:.2f} m/s | "
f"temperature={temperature:.2f} C"
)
print(
f"[SENT {now.isoformat()}] "
f"velocity={velocity:.2f} m/s | "
f"temperature={temperature:.2f} C"
)

# Control sampling rate
await asyncio.sleep(SEND_INTERVAL_SECONDS)
# Control sampling rate
await asyncio.sleep(SEND_INTERVAL_SECONDS)
except (KeyboardInterrupt, asyncio.CancelledError):
pass

print("Streaming session closed.")


# Standard Python entry point
# ---------------------------------------------------------------------
# asyncio.run() starts the async ingestion workflow.
# The outer exception handler suppresses noise from the async shutdown
# sequence that may occur after the streaming client has already been
# finished by the context manager.
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
except (KeyboardInterrupt, RuntimeError):
print("\nStreaming stopped by user.")
Loading