diff --git a/python/examples/ingestion-tutorial/logs/sift_stream_bindings.log.2026-04-09 b/python/examples/ingestion-tutorial/logs/sift_stream_bindings.log.2026-04-09 new file mode 100644 index 000000000..4e824b326 --- /dev/null +++ b/python/examples/ingestion-tutorial/logs/sift_stream_bindings.log.2026-04-09 @@ -0,0 +1,96 @@ +2026-04-09T23:03:23.221455Z INFO sift_stream::stream::builder::config_loader: created new ingestion config ingestion_config_id="c4a41a08-9a97-4c6d-a3d7-ceca70f53914" flow_names="vehicle_metrics" +2026-04-09T23:03:23.374361Z INFO sift_stream::stream::run: created new run run_id="8df2e922-ee43-4605-8f40-12e7216ca11c" run_name="robot_vehicle_20260409T230322_6bc64e03_run" +2026-04-09T23:03:23.377097Z INFO sift_stream::stream::tasks: backup manager task started sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02 +2026-04-09T23:03:23.377405Z INFO sift_stream::stream::tasks: ingestion task started sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02 +2026-04-09T23:03:23.377428Z INFO sift_stream::stream::tasks: creating new stream sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02 +2026-04-09T23:03:23.377439Z INFO sift_stream::stream::tasks: successfully initialized a new stream to Sift sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02 +2026-04-09T23:03:23.378133Z INFO sift_stream::stream::tasks: backup re-ingestion task started sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02 +2026-04-09T23:03:23.979642Z INFO sift_stream::stream::builder::config_loader: created new ingestion config ingestion_config_id="7e7b9616-ac1f-4f9f-88f8-cab8b9131a08" flow_names="sift-stream-metrics-flow" +2026-04-09T23:03:23.980268Z INFO sift_stream::stream::tasks: backup manager task started sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e +2026-04-09T23:03:23.980347Z INFO sift_stream::stream::tasks: ingestion task started sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e +2026-04-09T23:03:23.980364Z INFO sift_stream::stream::tasks: creating new stream sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e +2026-04-09T23:03:23.980380Z INFO sift_stream::stream::tasks: successfully initialized a new stream to Sift sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e +2026-04-09T23:03:23.980398Z INFO sift_stream::stream::tasks: Sift streaming successfully initialized sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e +2026-04-09T23:03:23.980473Z INFO sift_stream::stream::tasks: Sift streaming successfully initialized sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02 +2026-04-09T23:03:23.980516Z INFO sift_stream::stream::tasks: backup re-ingestion task started sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e +2026-04-09T23:03:23.980725Z INFO sift_stream::stream::tasks: metrics streaming task started sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02 +2026-04-09T23:03:23.981724Z INFO sift_stream::stream::mode::ingestion_config: flow 'vehicle_metrics' being ingested for the first time sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02 +2026-04-09T23:03:23.982611Z INFO sift_stream::stream::mode::ingestion_config: flow 'sift-stream-metrics-flow' being ingested for the first time sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e +2026-04-09T23:03:46.705518Z INFO sift_stream::stream::tasks: metrics streaming task shutting down +2026-04-09T23:03:46.705536Z INFO sift_stream::backup::disk::async_manager: backup manager shutting down +2026-04-09T23:03:46.705544Z INFO sift_stream::backup::disk::async_manager: backup manager cleanup started +2026-04-09T23:03:46.705566Z INFO sift_stream::backup::disk::async_manager: re-ingestion task shutting down +2026-04-09T23:03:46.705600Z INFO sift_stream::backup::disk::async_manager: re-ingestion task shutting down +2026-04-09T23:03:46.705920Z INFO sift_stream::stream::tasks: ingestion task shutting down sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e +2026-04-09T23:03:46.705932Z INFO sift_stream::backup::disk::async_manager: backup manager shutting down +2026-04-09T23:03:46.705936Z INFO sift_stream::backup::disk::async_manager: backup manager cleanup started +2026-04-09T23:03:46.705963Z INFO sift_stream::stream::tasks: ingestion task shutting down sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02 +2026-04-09T23:03:46.738349Z INFO sift_stream::stream::tasks: final stream completed successfully sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02 +2026-04-09T23:03:46.738359Z INFO sift_stream::stream::tasks: final stream completed successfully sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e +2026-04-09T23:03:46.746265Z INFO sift_stream::backup::disk::async_manager: no backup files need to be re-ingested +2026-04-09T23:03:46.746347Z INFO sift_stream::stream::mode::ingestion_config: successfully shutdown streaming system sift_stream_id=5a01e704-ba75-49fe-a80d-5ff6e38c054e +2026-04-09T23:03:46.746379Z INFO sift_stream::stream::mode::ingestion_config: successfully shutdown streaming system sift_stream_id=e2485573-ca83-4367-bbb5-e04a327a4c02 +2026-04-09T23:08:34.739983Z INFO sift_stream::stream::builder::config_loader: created new ingestion config ingestion_config_id="d8a9512e-983f-4d33-9073-554371acb452" flow_names="vehicle_metrics" +2026-04-09T23:08:34.888672Z INFO sift_stream::stream::run: created new run run_id="62f79930-0916-4ddf-ba04-80877e1d7332" run_name="robot_vehicle_20260409T230833_8345f9cf_run" +2026-04-09T23:08:34.891757Z INFO sift_stream::stream::tasks: backup manager task started sift_stream_id=6b985fea-312a-4577-9845-59390e02574d +2026-04-09T23:08:34.891972Z INFO sift_stream::stream::tasks: ingestion task started sift_stream_id=6b985fea-312a-4577-9845-59390e02574d +2026-04-09T23:08:34.891987Z INFO sift_stream::stream::tasks: creating new stream sift_stream_id=6b985fea-312a-4577-9845-59390e02574d +2026-04-09T23:08:34.891996Z INFO sift_stream::stream::tasks: successfully initialized a new stream to Sift sift_stream_id=6b985fea-312a-4577-9845-59390e02574d +2026-04-09T23:08:34.892478Z INFO sift_stream::stream::tasks: backup re-ingestion task started sift_stream_id=6b985fea-312a-4577-9845-59390e02574d +2026-04-09T23:08:35.399524Z INFO sift_stream::stream::builder::config_loader: created new ingestion config ingestion_config_id="ea9b4507-d1f1-4786-be16-e14445c90a30" flow_names="sift-stream-metrics-flow" +2026-04-09T23:08:35.400015Z INFO sift_stream::stream::tasks: Sift streaming successfully initialized sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741 +2026-04-09T23:08:35.400038Z INFO sift_stream::stream::tasks: ingestion task started sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741 +2026-04-09T23:08:35.400059Z INFO sift_stream::stream::tasks: creating new stream sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741 +2026-04-09T23:08:35.400078Z INFO sift_stream::stream::tasks: successfully initialized a new stream to Sift sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741 +2026-04-09T23:08:35.400087Z INFO sift_stream::stream::tasks: Sift streaming successfully initialized sift_stream_id=6b985fea-312a-4577-9845-59390e02574d +2026-04-09T23:08:35.400107Z INFO sift_stream::stream::tasks: backup re-ingestion task started sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741 +2026-04-09T23:08:35.400154Z INFO sift_stream::stream::tasks: backup manager task started sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741 +2026-04-09T23:08:35.400869Z INFO sift_stream::stream::tasks: metrics streaming task started sift_stream_id=6b985fea-312a-4577-9845-59390e02574d +2026-04-09T23:08:35.401121Z INFO sift_stream::stream::mode::ingestion_config: flow 'vehicle_metrics' being ingested for the first time sift_stream_id=6b985fea-312a-4577-9845-59390e02574d +2026-04-09T23:08:35.402720Z INFO sift_stream::stream::mode::ingestion_config: flow 'sift-stream-metrics-flow' being ingested for the first time sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741 +2026-04-09T23:08:41.290497Z INFO sift_stream::backup::disk::async_manager: backup manager shutting down +2026-04-09T23:08:41.290531Z INFO sift_stream::stream::tasks: ingestion task shutting down sift_stream_id=6b985fea-312a-4577-9845-59390e02574d +2026-04-09T23:08:41.290573Z INFO sift_stream::backup::disk::async_manager: re-ingestion task shutting down +2026-04-09T23:08:41.290666Z INFO sift_stream::backup::disk::async_manager: backup manager cleanup started +2026-04-09T23:08:41.290695Z INFO sift_stream::stream::tasks: metrics streaming task shutting down +2026-04-09T23:08:41.290755Z INFO sift_stream::backup::disk::async_manager: backup manager shutting down +2026-04-09T23:08:41.290771Z INFO sift_stream::stream::tasks: ingestion task shutting down sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741 +2026-04-09T23:08:41.290887Z INFO sift_stream::backup::disk::async_manager: re-ingestion task shutting down +2026-04-09T23:08:41.290936Z INFO sift_stream::backup::disk::async_manager: backup manager cleanup started +2026-04-09T23:08:41.325415Z INFO sift_stream::stream::tasks: final stream completed successfully sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741 +2026-04-09T23:08:41.325437Z INFO sift_stream::stream::tasks: final stream completed successfully sift_stream_id=6b985fea-312a-4577-9845-59390e02574d +2026-04-09T23:08:41.332876Z INFO sift_stream::backup::disk::async_manager: no backup files need to be re-ingested +2026-04-09T23:08:41.332919Z INFO sift_stream::stream::mode::ingestion_config: successfully shutdown streaming system sift_stream_id=8a0a2a81-01fd-41f5-a630-e2cf44e56741 +2026-04-09T23:08:41.332953Z INFO sift_stream::stream::mode::ingestion_config: successfully shutdown streaming system sift_stream_id=6b985fea-312a-4577-9845-59390e02574d +2026-04-09T23:11:49.333957Z INFO sift_stream::stream::builder::config_loader: created new ingestion config ingestion_config_id="552c88f3-a459-432a-b9d8-08218ff66e01" flow_names="vehicle_metrics" +2026-04-09T23:11:49.438671Z INFO sift_stream::stream::run: created new run run_id="b33d1236-14c6-4ea7-8a1c-f419dee7d338" run_name="robot_vehicle_20260409T231148_993c090c_run" +2026-04-09T23:11:49.441563Z INFO sift_stream::stream::tasks: backup manager task started sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4 +2026-04-09T23:11:49.441913Z INFO sift_stream::stream::tasks: ingestion task started sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4 +2026-04-09T23:11:49.441928Z INFO sift_stream::stream::tasks: creating new stream sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4 +2026-04-09T23:11:49.441938Z INFO sift_stream::stream::tasks: successfully initialized a new stream to Sift sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4 +2026-04-09T23:11:49.442231Z INFO sift_stream::stream::tasks: backup re-ingestion task started sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4 +2026-04-09T23:11:49.938519Z INFO sift_stream::stream::builder::config_loader: created new ingestion config ingestion_config_id="288d15a5-28f1-4002-8747-992ec9bf93bb" flow_names="sift-stream-metrics-flow" +2026-04-09T23:11:49.939053Z INFO sift_stream::stream::tasks: backup manager task started sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96 +2026-04-09T23:11:49.939108Z INFO sift_stream::stream::tasks: Sift streaming successfully initialized sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96 +2026-04-09T23:11:49.939146Z INFO sift_stream::stream::tasks: ingestion task started sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96 +2026-04-09T23:11:49.939168Z INFO sift_stream::stream::tasks: creating new stream sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96 +2026-04-09T23:11:49.939183Z INFO sift_stream::stream::tasks: Sift streaming successfully initialized sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4 +2026-04-09T23:11:49.939193Z INFO sift_stream::stream::tasks: successfully initialized a new stream to Sift sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96 +2026-04-09T23:11:49.939278Z INFO sift_stream::stream::tasks: backup re-ingestion task started sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96 +2026-04-09T23:11:49.939460Z INFO sift_stream::stream::tasks: metrics streaming task started sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4 +2026-04-09T23:11:49.940313Z INFO sift_stream::stream::mode::ingestion_config: flow 'vehicle_metrics' being ingested for the first time sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4 +2026-04-09T23:11:49.941057Z INFO sift_stream::stream::mode::ingestion_config: flow 'sift-stream-metrics-flow' being ingested for the first time sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96 +2026-04-09T23:12:03.267990Z INFO sift_stream::stream::tasks: metrics streaming task shutting down +2026-04-09T23:12:03.268002Z INFO sift_stream::stream::tasks: ingestion task shutting down sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4 +2026-04-09T23:12:03.268031Z INFO sift_stream::backup::disk::async_manager: re-ingestion task shutting down +2026-04-09T23:12:03.268056Z INFO sift_stream::backup::disk::async_manager: backup manager shutting down +2026-04-09T23:12:03.268060Z INFO sift_stream::backup::disk::async_manager: backup manager cleanup started +2026-04-09T23:12:03.268071Z INFO sift_stream::stream::tasks: ingestion task shutting down sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96 +2026-04-09T23:12:03.268104Z INFO sift_stream::backup::disk::async_manager: re-ingestion task shutting down +2026-04-09T23:12:03.268142Z INFO sift_stream::backup::disk::async_manager: backup manager shutting down +2026-04-09T23:12:03.268408Z INFO sift_stream::backup::disk::async_manager: backup manager cleanup started +2026-04-09T23:12:03.300586Z INFO sift_stream::stream::tasks: final stream completed successfully sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96 +2026-04-09T23:12:03.300597Z INFO sift_stream::stream::tasks: final stream completed successfully sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4 +2026-04-09T23:12:03.305209Z INFO sift_stream::backup::disk::async_manager: no backup files need to be re-ingested +2026-04-09T23:12:03.305228Z INFO sift_stream::stream::mode::ingestion_config: successfully shutdown streaming system sift_stream_id=d12c91a4-46b3-4d6a-9320-e4123c3b5a96 +2026-04-09T23:12:03.305251Z INFO sift_stream::stream::mode::ingestion_config: successfully shutdown streaming system sift_stream_id=b9279f02-24d8-4210-ab9f-25494cc89ea4 diff --git a/python/examples/ingestion-tutorial/stream.py b/python/examples/ingestion-tutorial/stream.py index f49b4ced4..3053b81d1 100644 --- a/python/examples/ingestion-tutorial/stream.py +++ b/python/examples/ingestion-tutorial/stream.py @@ -178,41 +178,48 @@ async def main() -> None: ingestion_config=ingestion_config, run=run, ) as ingest_client: - # Continue streaming until the user terminates the program - while True: - now = datetime.now(timezone.utc) - - # Generate mock telemetry values - # --------------------------------------------------------- - # In a real system, these would come from sensors, - # hardware interfaces, or production metrics. - velocity = random.uniform(0, 10) - temperature = random.uniform(20, 40) - - # Build and send a FlowPy directly using sift_stream_bindings types - # --------------------------------------------------------- - # Using FlowPy, ChannelValuePy, ValuePy, and TimeValuePy directly - # avoids the CPU-bound conversion overhead of the ergonomic - # flow_config.as_flow() helper. - await ingest_client.send( - FlowPy( - flow_name=FLOW_NAME, - timestamp=TimeValuePy.from_timestamp_millis(int(now.timestamp() * 1000)), - values=[ - ChannelValuePy(name="velocity", value=ValuePy.Double(velocity)), - ChannelValuePy(name="temperature", value=ValuePy.Double(temperature)), - ], + # Continue streaming until the user terminates the program. + # On Ctrl+C, Python's asyncio raises CancelledError (not + # KeyboardInterrupt) inside awaiting coroutines. Catching it + # here lets the async-with context manager exit cleanly and + # call finish() via __aexit__. + try: + while True: + now = datetime.now(timezone.utc) + + # Generate mock telemetry values + # --------------------------------------------------------- + # In a real system, these would come from sensors, + # hardware interfaces, or production metrics. + velocity = random.uniform(0, 10) + temperature = random.uniform(20, 40) + + # Build and send a FlowPy directly using sift_stream_bindings types + # --------------------------------------------------------- + # Using FlowPy, ChannelValuePy, ValuePy, and TimeValuePy directly + # avoids the CPU-bound conversion overhead of the ergonomic + # flow_config.as_flow() helper. + await ingest_client.send( + FlowPy( + flow_name=FLOW_NAME, + timestamp=TimeValuePy.from_timestamp_millis(int(now.timestamp() * 1000)), + values=[ + ChannelValuePy(name="velocity", value=ValuePy.Double(velocity)), + ChannelValuePy(name="temperature", value=ValuePy.Double(temperature)), + ], + ) ) - ) - print( - f"[SENT {now.isoformat()}] " - f"velocity={velocity:.2f} m/s | " - f"temperature={temperature:.2f} C" - ) + print( + f"[SENT {now.isoformat()}] " + f"velocity={velocity:.2f} m/s | " + f"temperature={temperature:.2f} C" + ) - # Control sampling rate - await asyncio.sleep(SEND_INTERVAL_SECONDS) + # Control sampling rate + await asyncio.sleep(SEND_INTERVAL_SECONDS) + except (KeyboardInterrupt, asyncio.CancelledError): + pass print("Streaming session closed.") @@ -220,8 +227,11 @@ async def main() -> None: # Standard Python entry point # --------------------------------------------------------------------- # asyncio.run() starts the async ingestion workflow. +# The outer exception handler suppresses noise from the async shutdown +# sequence that may occur after the streaming client has already been +# finished by the context manager. if __name__ == "__main__": try: asyncio.run(main()) - except KeyboardInterrupt: + except (KeyboardInterrupt, RuntimeError): print("\nStreaming stopped by user.")