diff --git a/sentry_streams/sentry_streams/config.json b/sentry_streams/sentry_streams/config.json index b5486543..72c2cef8 100644 --- a/sentry_streams/sentry_streams/config.json +++ b/sentry_streams/sentry_streams/config.json @@ -13,6 +13,9 @@ }, "metrics": { "$ref": "#/definitions/Metrics" + }, + "sentry_sdk_config": { + "$ref": "#/definitions/StreamingPlatformConfig" } } }, @@ -49,7 +52,9 @@ "properties": { "type": { "type": "string", - "enum": ["datadog"] + "enum": [ + "datadog" + ] }, "host": { "type": "string" @@ -62,18 +67,37 @@ "additionalProperties": true } }, - "required": ["type", "host", "port"] + "required": [ + "type", + "host", + "port" + ] }, { "type": "object", "properties": { "type": { "type": "string", - "enum": ["dummy"] + "enum": [ + "dummy" + ] } }, - "required": ["type"] + "required": [ + "type" + ] + } + ] + }, + "StreamingPlatformConfig": { + "type": "object", + "properties": { + "dsn": { + "type": "string" } + }, + "required": [ + "dsn" ] } } diff --git a/sentry_streams/sentry_streams/runner.py b/sentry_streams/sentry_streams/runner.py index 329420bc..db354689 100644 --- a/sentry_streams/sentry_streams/runner.py +++ b/sentry_streams/sentry_streams/runner.py @@ -1,10 +1,13 @@ import importlib import json import logging -from typing import Any, Optional, cast +import multiprocessing +import sys +from typing import Any, Mapping, Optional, cast import click import jsonschema +import sentry_sdk import yaml from sentry_streams.adapters.loader import load_adapter @@ -60,13 +63,36 @@ def iterate_edges( step_streams[branch_name] = next_step_stream[branch_name] +def _load_pipeline(application: str) -> Pipeline[Any]: + """ + Worker function that runs in a separate process to load the pipeline. + Returns the Pipeline object directly, or raises an exception on error. + + Customer code exceptions are allowed to propagate naturally so that the customer's + Sentry SDK (if initialized) can capture them. + """ + import contextlib + + pipeline_globals: dict[str, Any] = {} + + with contextlib.redirect_stdout(sys.stderr): + with open(application, "r") as f: + exec(f.read(), pipeline_globals) + + if "pipeline" not in pipeline_globals: + raise ValueError("Application file must define a 'pipeline' variable") + + pipeline = cast(Pipeline[Any], pipeline_globals["pipeline"]) + return pipeline + + def load_runtime( name: str, log_level: str, adapter: str, - config: str, segment_id: Optional[str], application: str, + environment_config: Mapping[str, Any], ) -> Any: logging.basicConfig( @@ -74,23 +100,10 @@ def load_runtime( format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) + with multiprocessing.Pool(processes=1) as pool: + pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,)) - pipeline_globals: dict[str, Any] = {} - - with open(application) as f: - exec(f.read(), pipeline_globals) - - with open(config, "r") as config_file: - environment_config = yaml.safe_load(config_file) - - config_template = importlib.resources.files("sentry_streams") / "config.json" - with config_template.open("r") as file: - schema = json.load(file) - - try: - jsonschema.validate(environment_config, schema) - except Exception: - raise + validate_all_branches_have_sinks(pipeline) metric_config = environment_config.get("metrics", {}) if metric_config.get("type") == "datadog": @@ -114,8 +127,6 @@ def load_runtime( metric_config = {} assigned_segment_id = int(segment_id) if segment_id else None - pipeline: Pipeline[Any] = pipeline_globals["pipeline"] - validate_all_branches_have_sinks(pipeline) runtime: Any = load_adapter(adapter, environment_config, assigned_segment_id, metric_config) translator = RuntimeTranslator(runtime) @@ -124,6 +135,54 @@ def load_runtime( return runtime +def load_runtime_with_config_file( + name: str, + log_level: str, + adapter: str, + config: str, + segment_id: Optional[str], + application: str, +) -> Any: + """Load runtime from a config file path, returning the runtime object without calling run().""" + with open(config, "r") as f: + environment_config = yaml.safe_load(f) + + config_template = importlib.resources.files("sentry_streams") / "config.json" + with config_template.open("r") as file: + schema = json.load(file) + + jsonschema.validate(environment_config, schema) + + sentry_sdk_config = environment_config.get("sentry_sdk_config") + if sentry_sdk_config: + sentry_sdk.init(dsn=sentry_sdk_config["dsn"]) + + return load_runtime(name, log_level, adapter, segment_id, application, environment_config) + + +def run_with_config_file( + name: str, + log_level: str, + adapter: str, + config: str, + segment_id: Optional[str], + application: str, +) -> None: + """ + Load runtime from config file and run it. Used by the Python CLI. + + NOTE: This function is separate from load_runtime_with_config_file() for a reason: + - load_runtime_with_config_file() returns the runtime WITHOUT calling .run() + - This allows the Rust CLI (run.rs) to call .run() itself + - Do NOT combine these functions - it would break the Rust CLI which needs to + control when .run() is called + """ + runtime = load_runtime_with_config_file( + name, log_level, adapter, config, segment_id, application + ) + runtime.run() + + @click.command() @click.option( "--name", @@ -179,8 +238,7 @@ def main( segment_id: Optional[str], application: str, ) -> None: - runtime = load_runtime(name, log_level, adapter, config, segment_id, application) - runtime.run() + run_with_config_file(name, log_level, adapter, config, segment_id, application) if __name__ == "__main__": diff --git a/sentry_streams/src/run.rs b/sentry_streams/src/run.rs index b042cd2c..85031758 100644 --- a/sentry_streams/src/run.rs +++ b/sentry_streams/src/run.rs @@ -33,28 +33,22 @@ pub struct PyConfig { #[derive(Parser, Debug)] pub struct RuntimeConfig { - /// The name of the Sentry Streams application #[arg(short, long)] pub name: String, - /// The name of the Sentry Streams application #[arg(short, long)] pub log_level: String, - /// The name of the adapter #[arg(short, long)] - pub adapter_name: String, + pub adapter: String, - /// The deployment config file path. Each config file currently corresponds to a specific pipeline. #[arg(short, long)] - pub config_file: OsString, + pub config: OsString, - /// The segment id to run the pipeline for #[arg(short, long)] pub segment_id: Option, - /// The name of the application - pub application_name: String, + pub application: String, } pub fn run(args: Args) -> Result<(), Box> { @@ -76,14 +70,14 @@ pub fn run(args: Args) -> Result<(), Box> { let runtime: Py = traced_with_gil!(|py| { let runtime = py .import("sentry_streams.runner")? - .getattr("load_runtime")? + .getattr("load_runtime_with_config_file")? .call1(( runtime_config.name, runtime_config.log_level, - runtime_config.adapter_name, - runtime_config.config_file, + runtime_config.adapter, + runtime_config.config, runtime_config.segment_id, - runtime_config.application_name, + runtime_config.application, ))? .unbind(); PyResult::Ok(runtime) diff --git a/sentry_streams/tests/fixtures/missing_pipeline.py b/sentry_streams/tests/fixtures/missing_pipeline.py new file mode 100644 index 00000000..6489bba1 --- /dev/null +++ b/sentry_streams/tests/fixtures/missing_pipeline.py @@ -0,0 +1,9 @@ +import sentry_sdk + +# Initialize customer's Sentry SDK in the subprocess +sentry_sdk.init(dsn="https://customer@example.com/123") + +from sentry_streams.pipeline import streaming_source + +# Intentionally not defining 'pipeline' variable +my_pipeline = streaming_source(name="test", stream_name="test-stream") diff --git a/sentry_streams/tests/fixtures/simple_app.py b/sentry_streams/tests/fixtures/simple_app.py new file mode 100644 index 00000000..6ac6b302 --- /dev/null +++ b/sentry_streams/tests/fixtures/simple_app.py @@ -0,0 +1,4 @@ +from sentry_streams.pipeline import streaming_source +from sentry_streams.pipeline.pipeline import DevNullSink + +pipeline = streaming_source(name="test", stream_name="test-stream").sink(DevNullSink("test-sink")) diff --git a/sentry_streams/tests/fixtures/test_config.yaml b/sentry_streams/tests/fixtures/test_config.yaml new file mode 100644 index 00000000..dfc18ab6 --- /dev/null +++ b/sentry_streams/tests/fixtures/test_config.yaml @@ -0,0 +1,4 @@ +sentry_sdk_config: + dsn: "https://platform@example.com/456" +metrics: + type: dummy diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py new file mode 100644 index 00000000..36724897 --- /dev/null +++ b/sentry_streams/tests/test_load_runtime.py @@ -0,0 +1,136 @@ +from pathlib import Path +from typing import Any, Generator, List, Optional +from unittest.mock import patch + +import pytest +import sentry_sdk +from sentry_sdk.transport import Transport + +from sentry_streams.runner import load_runtime, run_with_config_file + +# Path to fixtures directory +FIXTURES_DIR = Path(__file__).parent / "fixtures" + + +class CaptureTransport(Transport): + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.events: List[Any] = [] + self.envelopes: List[Any] = [] + + def capture_event(self, event: Any) -> None: + self.events.append(event) + return None + + def capture_envelope(self, envelope: Any) -> None: + self.envelopes.append(envelope) + return None + + def flush(self, timeout: float, callback: Optional[Any] = None) -> None: + """Flush is called when SDK shuts down.""" + pass + + +@pytest.fixture +def platform_transport() -> CaptureTransport: + transport = CaptureTransport() + # Clear any existing Sentry client + sentry_sdk.get_client().close() + return transport + + +@pytest.fixture(autouse=True) +def reset_metrics_backend() -> Generator[None, None, None]: + """Reset metrics backend before and after each test. + + This fixture prevents "Metrics is already set" errors when tests run in CI. + The metrics backend uses a global module-level variable that persists across + tests in the same pytest session. Since load_runtime() calls configure_metrics(), + we need to reset the global state to ensure test isolation. + """ + import arroyo.utils.metrics + + import sentry_streams.metrics.metrics + + # Reset before test runs (setup) + sentry_streams.metrics.metrics._metrics_backend = None + arroyo.utils.metrics._metrics_backend = None + yield + # Reset after test completes (teardown) + sentry_streams.metrics.metrics._metrics_backend = None + arroyo.utils.metrics._metrics_backend = None + + +def test_multiprocess_pipe_communication_success(platform_transport: CaptureTransport) -> None: + sentry_sdk.init( + dsn="https://platform@example.com/456", + transport=platform_transport, + ) + + app_file = FIXTURES_DIR / "simple_app.py" + + runtime = load_runtime( + name="test", + log_level="INFO", + adapter="dummy", + segment_id=None, + application=str(app_file), + environment_config={"metrics": {"type": "dummy"}}, + ) + + assert runtime is not None + + # Verify that the pipeline was loaded and edges were iterated + # The dummy adapter tracks input streams + from sentry_streams.dummy.dummy_adapter import DummyAdapter + + assert isinstance(runtime, DummyAdapter) + assert "test" in runtime.input_streams + assert "test-sink" in runtime.input_streams + + +def test_subprocess_sends_error_status_with_details(platform_transport: CaptureTransport) -> None: + """Test that detailed error messages are captured when subprocess sends status='error'.""" + + app_file = FIXTURES_DIR / "missing_pipeline.py" + config_file = FIXTURES_DIR / "test_config.yaml" + + # Patch sentry_sdk.init to use our custom transport + original_init = sentry_sdk.init + error_raised = False + + def custom_init(**kwargs: Any) -> None: + kwargs["transport"] = platform_transport + original_init(**kwargs) + + with patch("sentry_streams.runner.sentry_sdk.init", side_effect=custom_init): + try: + run_with_config_file( + name="test", + log_level="INFO", + adapter="arroyo", + config=str(config_file), + segment_id=None, + application=str(app_file), + ) + except ValueError as e: + error_raised = True + sentry_sdk.capture_exception(e) + sentry_sdk.flush() + assert "Application file must define a 'pipeline' variable" in str(e) + + assert error_raised, "Expected intentiaonl ValueError to be raised" + + assert len(platform_transport.envelopes) > 0, "Error should be captured in platform_transport" + + envelope = platform_transport.envelopes[0] + items = envelope.items + assert len(items) > 0, "Envelope should contain at least one item" + + event_item = items[0] + error_event = event_item.payload.json + + assert "exception" in error_event + error_message = str(error_event["exception"]["values"][0]["value"]) + assert "Application file must define a 'pipeline' variable" in error_message