Skip to content

python(feat): Sift Client Ingestion#370

Merged
nathan-sift merged 47 commits intomainfrom
python/sift-client-ingestion
Nov 11, 2025
Merged

python(feat): Sift Client Ingestion#370
nathan-sift merged 47 commits intomainfrom
python/sift-client-ingestion

Conversation

@nathan-sift
Copy link
Copy Markdown
Contributor

@nathan-sift nathan-sift commented Nov 7, 2025

Adds ingestion to SiftClient, using sift_stream_bindings to interface with the underlying SiftStream.

  • Adds ingestion types and classes to provide a user friendly pythonic interface to SiftStream
  • Users may also directly use the sift_stream_binding types during ingestion
  • Users have access to SiftStream's tracing (logs) and metrics
  • Users may configure SiftStream's recovery strategy, including the retry policy and how/if to use backup files used for data re-ingestion if a SiftStream checkpoint indicates a failure to ingest all data

Example

    client = SiftClient(connection_config=connection_config)

    # Ingestion configs are created using SiftClient types

    msg_count_ch = ChannelConfig(
        name="ingest.msg_count",
        data_type=ChannelDataType.UINT_64,
    )

    elapsed_ingest_time_ch = ChannelConfig(
        name="ingest.elapsed_time",
        data_type=ChannelDataType.DOUBLE,
    )

    files_ingested_ch = ChannelConfig(
        name="ingest.backup_files_ingested", data_type=ChannelDataType.UINT_64
    )

    ingestion_config = IngestionConfigCreate(
        asset_name="sift_rover_1",
        client_key="sift_rover_1_v2",
        flows=[
            FlowConfig(
                name="stats", channels=[msg_count_ch, elapsed_ingest_time_ch, files_ingested_ch]
            )
        ],
    )

    added_onboard_flow = False
    onboard_flow = FlowConfig(
        name="onboard_sensors",
        channels=[
            ChannelConfig(name="motor_temp", unit="C", data_type=ChannelDataType.DOUBLE),
            ChannelConfig(
                name="tank_pressure", unit="kPa", data_type=ChannelDataType.DOUBLE
            ),
        ],
    )

    run = RunCreate(name="sift_rover-" + str(int(time.time())))

    # Tracing (SiftStream logging) can be configured once per process
    # In this case, we would like to include debug level logs
    # By default, we'll get a rotating set of 7 logs files in the ./logs directory
    tracing_config = TracingConfig.with_file(level="debug")

    async with await client.async_.ingestion.create_ingestion_config_streaming_client(
        ingestion_config=ingestion_config,
        run=run,
        tracing_config=tracing_config,
        recovery_strategy=RecoveryStrategyConfig.retry_with_backups(),
    ) as ingest_client:
        msg_count = 0
        while True:
            msg_count += 1

            # Users can access SiftStream metrics
            metrics = ingest_client.get_metrics_snapshot()

            channel_values = [msg_count_ch.as_channel_value(msg_count)]

            # SiftStream does not need all channels in a flow to be present, or in order
            if msg_count % 2 == 0:
                channel_values += [
                    files_ingested_ch.as_channel_value(metrics.backups.files_ingested),
                    elapsed_ingest_time_ch.as_channel_value(metrics.elapsed_secs),
                ]

            # Flow timestamps default to current time if not provided
            flow = Flow(flow="stats", channel_values=channel_values)

            await ingest_client.send(flow=flow)
            
            # New flows can be added at runtime
            if not added_onboard_flow:
                await ingest_client.add_new_flows(flow_configs=[onboard_flow])
                added_onboard_flow = True

            # Flows can also be generated easily from the ingest client
            # Even flows that were added after the initial client init
            flow_config = ingest_client.get_flow_config(flow_name="onboard_sensors")
            flow = flow_config.as_flow(
                timestamp=datetime.now(timezone.utc),
                values={
                    "motor_temp": 50.0 + random.random() * 5.0,
                    "tank_pressure": 2000.0 + random.random() * 100.0,
                },
            )
            await ingest_client.send(flow=flow)

            await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())

Output from above code

2025-11-07T22:04:40.815174Z  INFO sift_stream::stream::builder: created new ingestion config ingestion_config_id="61380ba6-f84a-416f-9389-c14afa761ccc" flow_names="stats"
2025-11-07T22:04:40.820612Z  INFO sift_stream::stream::run: created new run run_id="6465f026-0928-4dbe-8060-f0e68ab24de9" run_name="sift_rover-1762553080"
2025-11-07T22:04:40.827748Z  INFO sift_stream::stream::tasks: Sift streaming successfully initialized sift_stream_id="f55a3352-20fc-4c83-92d1-a4998d9ca750"
2025-11-07T22:04:40.827756Z  INFO sift_stream::stream::tasks: backup manager task started sift_stream_id="f55a3352-20fc-4c83-92d1-a4998d9ca750"
2025-11-07T22:04:40.827757Z  INFO sift_stream::stream::tasks: ingestion task started sift_stream_id="f55a3352-20fc-4c83-92d1-a4998d9ca750"
2025-11-07T22:04:40.827852Z  INFO sift_stream::stream::tasks: creating new stream sift_stream_id="f55a3352-20fc-4c83-92d1-a4998d9ca750"
2025-11-07T22:04:40.827867Z  INFO sift_stream::stream::tasks: successfully initialized a new stream to Sift sift_stream_id="f55a3352-20fc-4c83-92d1-a4998d9ca750"
2025-11-07T22:04:40.828270Z  INFO sift_stream::stream::tasks: backup re-ingestion task started sift_stream_id="f55a3352-20fc-4c83-92d1-a4998d9ca750"
2025-11-07T22:04:40.829228Z  INFO sift_stream::stream::mode::ingestion_config: flow 'stats' being ingested for the first time sift_stream_id="f55a3352-20fc-4c83-92d1-a4998d9ca750"
2025-11-07T22:04:40.841585Z  INFO sift_stream::stream::mode::ingestion_config: successfully registered new flow sift_stream_id="f55a3352-20fc-4c83-92d1-a4998d9ca750" flow="onboard_sensors"
2025-11-07T22:04:40.842151Z  INFO sift_stream::stream::mode::ingestion_config: flow 'onboard_sensors' being ingested for the first time sift_stream_id="f55a3352-20fc-4c83-92d1-a4998d9ca750"
2025-11-07T22:05:40.829805Z  INFO sift_stream::stream::tasks: checkpoint expired sift_stream_id="f55a3352-20fc-4c83-92d1-a4998d9ca750"
2025-11-07T22:05:40.831565Z  INFO sift_stream::stream::tasks: checkpoint succeeded - data streamed to Sift successfully sift_stream_id="f55a3352-20fc-4c83-92d1-a4998d9ca750"
2025-11-07T22:05:40.831653Z  INFO sift_stream::metrics: checkpoint_count="0" stream_duration="60.0s" messages_processed=120 message_rate="1.9996353193081307 messages/s" bytes_processed="14.1 KiB" byte_rate="242 B/s"
2025-11-07T22:05:40.831707Z  INFO sift_stream::stream::tasks: creating new stream sift_stream_id="f55a3352-20fc-4c83-92d1-a4998d9ca750"
2025-11-07T22:05:40.831733Z  INFO sift_stream::stream::tasks: successfully initialized a new stream to Sift sift_stream_id="f55a3352-20fc-4c83-92d1-a4998d9ca750"
2025-11-07T22:05:40.836140Z  INFO sift_stream::backup::disk::async_manager: reingestion not required, removing backup files for completed checkpoint

Errors returned from SiftStream will be raised as exceptions in python that can be handled
For example, if modifying an already existing flow incorrectly

RuntimeError: [IncompatibleIngestionConfigChange]: flow(s) with name 'test-sift-client-ingestion-flow' exist but their channel configs do not match what the user specified

[cause]:
   - incompatible change to ingestion config

[help]:
   - Did you modify an existing flow? Try updating the the flow's name or the 'client_key' of `sift_stream::IngestionConfigForm`

@ian-sift
Copy link
Copy Markdown
Contributor

ian-sift commented Nov 7, 2025

General thoughts while you work on tests etc:

  • Not seen in your example posted in chat, but would be nice to have an example of adding new flows after initial config since I can imagine cases where you'd not know about all flows a priori or in one place/may be programmatically adding flows upon receiving new telem messages
  • Interpreting/Casting helpers: Similarly, would be nice to just send a flow w/ new values i.e. not have to re-declare flow_name every time and potentially either send an ordered list or values or a dict of <channel_name, value> pairs.
    • Based on the flow config, we should be able to cast the values to rust types appropriately (not have users need to cast ValuePy.Double(value) etc themselves as I've seen this lead them to create boilerplate wrappers just to help invoking our ingest()/send() methods.
    • And the default timestamp could just be TimeValuePy() so they don't have to specify if they don't want (and when they do, pass like a datetime and not have to know to call to_rust_timestamp() but maybe your _to_rust_form handling for Flow object solves avoiding them having to figure out which helpers to call)
  • Perf: All for reducing complexity of threading but still want to confirm: is there some plan for the underlying Rust to be managing threading if load is high or is it just performant enough that it never really deals with throughput issues?

@nathan-sift
Copy link
Copy Markdown
Contributor Author

General thoughts while you work on tests etc:

  • Not seen in your example posted in chat, but would be nice to have an example of adding new flows after initial config since I can imagine cases where you'd not know about all flows a priori or in one place/may be programmatically adding flows upon receiving new telem messages

  • Interpreting/Casting helpers: Similarly, would be nice to just send a flow w/ new values i.e. not have to re-declare flow_name every time and potentially either send an ordered list or values or a dict of <channel_name, value> pairs.

    • Based on the flow config, we should be able to cast the values to rust types appropriately (not have users need to cast ValuePy.Double(value) etc themselves as I've seen this lead them to create boilerplate wrappers just to help invoking our ingest()/send() methods.
    • And the default timestamp could just be TimeValuePy() so they don't have to specify if they don't want (and when they do, pass like a datetime and not have to know to call to_rust_timestamp() but maybe your _to_rust_form handling for Flow object solves avoiding them having to figure out which helpers to call)
  • Perf: All for reducing complexity of threading but still want to confirm: is there some plan for the underlying Rust to be managing threading if load is high or is it just performant enough that it never really deals with throughput issues?

  • Good call out. It wasn't in my latest test example (see the current description). I'll make sure all potential methods are used in testing and the example
  • This is more user friendly in my current example. It's still not perfect, I'll think of a few ideas here early today. But ultimately, we do need to ensure we pass the value as the correct type (not int, but int64 for example). That information either needs to be provided directly by the user, or indirectly through info already known by the program (aka, using the flow/ingestion config)
  • That's a good idea as a default timestamp. I'll add that
  • We need more benchmarking, but the SiftStream performance is excellent. More info in Slack.

Comment thread python/lib/sift_client/resources/ingestion.py Outdated
Comment thread python/lib/sift_client/_internal/low_level_wrappers/ingestion.py Outdated
Comment thread python/lib/sift_client/_internal/low_level_wrappers/ingestion.py Outdated
Comment thread python/lib/sift_client/_internal/low_level_wrappers/ingestion.py
Comment thread python/lib/sift_client/resources/ingestion.py
Comment thread python/lib/sift_client/resources/ingestion.py
Comment thread python/lib/sift_client/sift_types/ingestion.py Outdated
Comment thread python/lib/sift_client/sift_types/ingestion.py Outdated
Comment thread python/lib/sift_client/sift_types/ingestion.py Outdated
Comment thread python/lib/sift_client/sift_types/ingestion.py
@marc-sift
Copy link
Copy Markdown
Contributor

Does this with in without async as well?

@nathan-sift
Copy link
Copy Markdown
Contributor Author

Does this with in without async as well?

I talked with Luck about this. For now, no. We can add that in once a customer asks for it, but in general, we expect async to be faster, so we should steer customers in that direction.

@nathan-sift nathan-sift force-pushed the python/sift-client-ingestion branch 2 times, most recently from f35d263 to 946fd7b Compare November 11, 2025 02:01
@nathan-sift nathan-sift marked this pull request as ready for review November 11, 2025 02:12
Copy link
Copy Markdown
Collaborator

@alexluck-sift alexluck-sift left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks very good! A few comments before merge

Comment thread python/docs/examples/ingestion_example.py Outdated
Comment thread python/lib/sift_client/resources/ingestion.py Outdated
Comment thread python/lib/sift_client/resources/ingestion.py Outdated
Comment thread python/lib/sift_client/sift_types/ingestion.py
Comment thread python/lib/sift_client/resources/ingestion.py
@nathan-sift nathan-sift removed the request for review from marc-sift November 11, 2025 04:41
@nathan-sift nathan-sift merged commit 753fabe into main Nov 11, 2025
15 checks passed
@nathan-sift nathan-sift deleted the python/sift-client-ingestion branch November 11, 2025 04:42
nathan-sift added a commit that referenced this pull request Nov 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants