From e3439fff377f1e5ec3b5b2f118de604b63f5fae4 Mon Sep 17 00:00:00 2001 From: Kyle Barnette Date: Sat, 2 May 2026 22:35:30 -0500 Subject: [PATCH] feat(streaming): add shared sidecar utility helpers Introduce reusable Sugar Glider sidecar helper functions for grpc address resolution, float env parsing, and sidecar payload normalization to reduce duplicate integration code in downstream services. Co-authored-by: Cursor --- src/deepiri_modelkit/streaming/__init__.py | 9 ++- .../streaming/sidecar_utils.py | 79 +++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 src/deepiri_modelkit/streaming/sidecar_utils.py diff --git a/src/deepiri_modelkit/streaming/__init__.py b/src/deepiri_modelkit/streaming/__init__.py index b498d7b..f6beb86 100644 --- a/src/deepiri_modelkit/streaming/__init__.py +++ b/src/deepiri_modelkit/streaming/__init__.py @@ -1,6 +1,13 @@ """Streaming client and utilities""" from .event_stream import StreamingClient +from .sidecar_utils import env_float, resolve_grpc_addr, sidecar_payload_from_fields from .topics import StreamTopics -__all__ = ["StreamingClient", "StreamTopics"] +__all__ = [ + "StreamingClient", + "StreamTopics", + "env_float", + "resolve_grpc_addr", + "sidecar_payload_from_fields", +] diff --git a/src/deepiri_modelkit/streaming/sidecar_utils.py b/src/deepiri_modelkit/streaming/sidecar_utils.py new file mode 100644 index 0000000..3af5322 --- /dev/null +++ b/src/deepiri_modelkit/streaming/sidecar_utils.py @@ -0,0 +1,79 @@ +""" +Shared Sugar Glider/Synapse sidecar helpers. + +These utilities are reused by multiple services (for example Cyrex and Helox) +to keep sidecar transport behavior consistent across repos. +""" + +from __future__ import annotations + +import json +import os +from typing import Any, Callable, Dict, Optional +from urllib.parse import urlparse + + +def env_float(name: str, default: float, logger: Optional[Callable[[str], None]] = None) -> float: + """Read a float env var with safe fallback.""" + raw = os.getenv(name) + if raw is None: + return default + try: + return float(raw) + except ValueError: + if logger is not None: + logger(f"invalid float env {name}={raw!r}; using {default}") + return default + + +def resolve_grpc_addr(base_url: str, explicit_grpc_addr: Optional[str] = None) -> str: + """ + Resolve sidecar gRPC host:port from explicit/env/base URL. + + Resolution order: + 1) explicit_grpc_addr + 2) SYNAPSE_GRPC_ADDR + 3) derive from base_url (8081 -> 50051) + """ + env_addr = os.getenv("SYNAPSE_GRPC_ADDR") + if explicit_grpc_addr: + return explicit_grpc_addr + if env_addr: + return env_addr + + parsed = urlparse(base_url) + if parsed.scheme in {"http", "https"}: + host = parsed.hostname or "localhost" + port = parsed.port + if port is None: + port = 443 if parsed.scheme == "https" else 80 + if port == 8081: + port = 50051 + return f"{host}:{port}" + + if base_url: + return base_url + return "localhost:50051" + + +def sidecar_payload_from_fields(fields: Dict[str, Any]) -> Dict[str, Any]: + """Normalize sidecar event fields to a payload dict.""" + payload = fields.get("payload", {}) + if isinstance(payload, str): + try: + payload = json.loads(payload) + except ValueError: + payload = {} + elif not isinstance(payload, dict): + payload = {} + + if "event" not in payload and fields.get("event_type"): + payload["event"] = fields.get("event_type") + + if "timestamp" not in payload and fields.get("timestamp"): + payload["timestamp"] = fields.get("timestamp") + + if "sender" not in payload and fields.get("sender"): + payload["sender"] = fields.get("sender") + + return payload