From 4ee905cbac8634f2220d423164683fa7cc14eab9 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Thu, 29 Jan 2026 00:14:48 -0500 Subject: [PATCH 01/15] version after second pass remove main renamed some functions up to date set subprocess path --- sentry_streams/sentry_streams/config.json | 12 ++ sentry_streams/sentry_streams/runner.py | 88 ++++++++--- sentry_streams/src/run.rs | 55 ++++--- sentry_streams/tests/pipeline/test_chain.py | 3 +- .../tests/pipeline/test_validation.py | 68 -------- sentry_streams/tests/test_load_runtime.py | 148 ++++++++++++++++++ sentry_streams/tests/test_runner.py | 22 +-- 7 files changed, 260 insertions(+), 136 deletions(-) delete mode 100644 sentry_streams/tests/pipeline/test_validation.py create mode 100644 sentry_streams/tests/test_load_runtime.py diff --git a/sentry_streams/sentry_streams/config.json b/sentry_streams/sentry_streams/config.json index b5486543..63ab043f 100644 --- a/sentry_streams/sentry_streams/config.json +++ b/sentry_streams/sentry_streams/config.json @@ -13,6 +13,9 @@ }, "metrics": { "$ref": "#/definitions/Metrics" + }, + "streaming_platform_config": { + "$ref": "#/definitions/StreamingPlatformConfig" } } }, @@ -75,6 +78,15 @@ "required": ["type"] } ] + }, + "StreamingPlatformConfig": { + "type": "object", + "properties": { + "dsn": { + "type": "string" + } + }, + "required": ["dsn"] } } } diff --git a/sentry_streams/sentry_streams/runner.py b/sentry_streams/sentry_streams/runner.py index 329420bc..f3e323e8 100644 --- a/sentry_streams/sentry_streams/runner.py +++ b/sentry_streams/sentry_streams/runner.py @@ -1,10 +1,13 @@ import importlib import json import logging -from typing import Any, Optional, cast +import multiprocessing +import sys +from typing import Any, Mapping, Optional, cast import click import jsonschema +import sentry_sdk import yaml from sentry_streams.adapters.loader import load_adapter @@ -60,13 +63,36 @@ def iterate_edges( step_streams[branch_name] = next_step_stream[branch_name] +def _load_pipeline(application: str) -> Pipeline[Any]: + """ + Worker function that runs in a separate process to load the pipeline. + Returns the Pipeline object directly, or raises an exception on error. + + Customer code exceptions are allowed to propagate naturally so that the customer's + Sentry SDK (if initialized) can capture them. + """ + import contextlib + + pipeline_globals: dict[str, Any] = {} + + with contextlib.redirect_stdout(sys.stderr): + with open(application, "r") as f: + exec(f.read(), pipeline_globals) + + if "pipeline" not in pipeline_globals: + raise ValueError("Application file must define a 'pipeline' variable") + + pipeline = cast(Pipeline[Any], pipeline_globals["pipeline"]) + return pipeline + + def load_runtime( name: str, log_level: str, adapter: str, - config: str, segment_id: Optional[str], application: str, + environment_config: Mapping[str, Any], ) -> Any: logging.basicConfig( @@ -74,23 +100,10 @@ def load_runtime( format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) - - pipeline_globals: dict[str, Any] = {} - - with open(application) as f: - exec(f.read(), pipeline_globals) - - with open(config, "r") as config_file: - environment_config = yaml.safe_load(config_file) - - config_template = importlib.resources.files("sentry_streams") / "config.json" - with config_template.open("r") as file: - schema = json.load(file) - - try: - jsonschema.validate(environment_config, schema) - except Exception: - raise + with multiprocessing.Pool(processes=1) as pool: + pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,)) + logger.info("Successfully loaded pipeline from subprocess") + validate_all_branches_have_sinks(pipeline) metric_config = environment_config.get("metrics", {}) if metric_config.get("type") == "datadog": @@ -114,8 +127,6 @@ def load_runtime( metric_config = {} assigned_segment_id = int(segment_id) if segment_id else None - pipeline: Pipeline[Any] = pipeline_globals["pipeline"] - validate_all_branches_have_sinks(pipeline) runtime: Any = load_adapter(adapter, environment_config, assigned_segment_id, metric_config) translator = RuntimeTranslator(runtime) @@ -124,6 +135,34 @@ def load_runtime( return runtime +def run_with_config_file( + name: str, + log_level: str, + adapter: str, + config: str, + segment_id: Optional[str], + application: str, +) -> None: + with open(config, "r") as f: + environment_config = yaml.safe_load(f) + + config_template = importlib.resources.files("sentry_streams") / "config.json" + with config_template.open("r") as file: + schema = json.load(file) + + try: + jsonschema.validate(environment_config, schema) + except Exception: + raise + + streaming_platform_config = environment_config.get("streaming_platform_config") + if streaming_platform_config: + sentry_sdk.init(dsn=streaming_platform_config["dsn"]) + + runtime = load_runtime(name, log_level, adapter, segment_id, application, environment_config) + runtime.run() + + @click.command() @click.option( "--name", @@ -171,7 +210,7 @@ def load_runtime( "application", required=True, ) -def main( +def run_with_cli( name: str, log_level: str, adapter: str, @@ -179,9 +218,8 @@ def main( segment_id: Optional[str], application: str, ) -> None: - runtime = load_runtime(name, log_level, adapter, config, segment_id, application) - runtime.run() + run_with_config_file(name, log_level, adapter, config, segment_id, application) if __name__ == "__main__": - main() + run_with_cli() diff --git a/sentry_streams/src/run.rs b/sentry_streams/src/run.rs index b042cd2c..7ad853e2 100644 --- a/sentry_streams/src/run.rs +++ b/sentry_streams/src/run.rs @@ -37,24 +37,24 @@ pub struct RuntimeConfig { #[arg(short, long)] pub name: String, - /// The name of the Sentry Streams application + /// The logging level #[arg(short, long)] pub log_level: String, - /// The name of the adapter + /// The stream adapter to use (e.g., "arroyo", "rust_arroyo") #[arg(short, long)] - pub adapter_name: String, + pub adapter: String, /// The deployment config file path. Each config file currently corresponds to a specific pipeline. #[arg(short, long)] - pub config_file: OsString, + pub config: OsString, /// The segment id to run the pipeline for #[arg(short, long)] pub segment_id: Option, - /// The name of the application - pub application_name: String, + /// The application file path + pub application: String, } pub fn run(args: Args) -> Result<(), Box> { @@ -63,6 +63,19 @@ pub fn run(args: Args) -> Result<(), Box> { runtime_config, } = args; + // Set PYTHONPATH environment variable if module_paths are provided + // This ensures subprocesses spawned via multiprocessing inherit the paths + if let Some(ref module_paths) = py_config.module_paths { + let separator = if cfg!(windows) { ";" } else { ":" }; + let path_string = module_paths + .iter() + .filter_map(|p| p.to_str()) + .collect::>() + .join(separator); + + std::env::set_var("PYTHONPATH", path_string); + } + traced_with_gil!(|py| -> PyResult<()> { if let Some(exec_path) = py_config.exec_path { PyModule::import(py, "sys")?.setattr("executable", exec_path)?; @@ -70,31 +83,21 @@ pub fn run(args: Args) -> Result<(), Box> { if let Some(module_paths) = py_config.module_paths { PyModule::import(py, "sys")?.setattr("path", module_paths)?; } - Ok(()) - })?; - - let runtime: Py = traced_with_gil!(|py| { - let runtime = py - .import("sentry_streams.runner")? - .getattr("load_runtime")? + py.import("sentry_streams.runner")? + .getattr("run_with_config_file")? .call1(( runtime_config.name, runtime_config.log_level, - runtime_config.adapter_name, - runtime_config.config_file, + runtime_config.adapter, + runtime_config.config.to_str().ok_or_else(|| { + pyo3::exceptions::PyValueError::new_err("Invalid config file path") + })?, runtime_config.segment_id, - runtime_config.application_name, - ))? - .unbind(); - PyResult::Ok(runtime) - })?; + runtime_config.application, + ))?; - traced_with_gil!(|py| { - runtime - .bind(py) - .call_method0("run") - .expect("Unable to start runtime"); - }); + Ok(()) + })?; Ok(()) } diff --git a/sentry_streams/tests/pipeline/test_chain.py b/sentry_streams/tests/pipeline/test_chain.py index 6a5a7c74..4c8a8468 100644 --- a/sentry_streams/tests/pipeline/test_chain.py +++ b/sentry_streams/tests/pipeline/test_chain.py @@ -7,7 +7,6 @@ from sentry_streams.pipeline.pipeline import ( Batch, ComplexStep, - DevNullSink, Filter, FlatMap, Map, @@ -182,7 +181,7 @@ def test_router() -> None: def test_register_steps(step: Union[Transform[Any, Any], ComplexStep[Any, Any]]) -> None: name = step.name pipeline = Pipeline(StreamSource(name="mysource", stream_name="name")) - pipeline.apply(step).sink(DevNullSink("test_sink")) + pipeline.apply(step) assert pipeline.steps[name] == step assert pipeline.steps[name].name == name assert pipeline.incoming_edges[name] == ["mysource"] diff --git a/sentry_streams/tests/pipeline/test_validation.py b/sentry_streams/tests/pipeline/test_validation.py deleted file mode 100644 index 47456682..00000000 --- a/sentry_streams/tests/pipeline/test_validation.py +++ /dev/null @@ -1,68 +0,0 @@ -from typing import Any - -import pytest - -from sentry_streams.pipeline.exception import InvalidPipelineError -from sentry_streams.pipeline.pipeline import ( - Map, - StreamSink, - branch, - streaming_source, -) -from sentry_streams.pipeline.validation import validate_all_branches_have_sinks - - -def test_valid_pipeline_with_router() -> None: - """Test that a pipeline with router and all branches having sinks passes validation.""" - - def routing_func(msg: Any) -> str: - return "route1" - - pipeline = ( - streaming_source("myinput", "events") - .apply(Map("transform1", lambda msg: msg)) - .route( - "route_to_one", - routing_function=routing_func, - routing_table={ - "route1": branch("route1") - .apply(Map("transform2", lambda msg: msg)) - .sink(StreamSink("myoutput1", stream_name="transformed-events-2")), - "route2": branch("route2") - .apply(Map("transform3", lambda msg: msg)) - .sink(StreamSink("myoutput2", stream_name="transformed-events-3")), - }, - ) - ) - - # Should not raise - validate_all_branches_have_sinks(pipeline) - - -def test_invalid_pipeline_with_router_missing_sink() -> None: - """Test that a pipeline with router where one branch is missing a sink fails validation.""" - - def routing_func(msg: Any) -> str: - return "route1" - - pipeline = ( - streaming_source("myinput", "events") - .apply(Map("transform1", lambda msg: msg)) - .route( - "route_to_one", - routing_function=routing_func, - routing_table={ - "route1": branch("route1") - .apply(Map("transform2", lambda msg: msg)) - .sink(StreamSink("myoutput1", stream_name="transformed-events-2")), - "route2": branch("route2").apply(Map("transform3", lambda msg: msg)), - # Missing sink on route2 - }, - ) - ) - - with pytest.raises(InvalidPipelineError) as exc_info: - validate_all_branches_have_sinks(pipeline) - - assert "transform3" in str(exc_info.value) - assert "must terminate with a Sink step" in str(exc_info.value) diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py new file mode 100644 index 00000000..2792d3ac --- /dev/null +++ b/sentry_streams/tests/test_load_runtime.py @@ -0,0 +1,148 @@ +from typing import Any, Generator, List, Optional +from unittest.mock import patch + +import pytest +import sentry_sdk +from sentry_sdk.transport import Transport + +from sentry_streams.pipeline.pipeline import Pipeline +from sentry_streams.runner import load_runtime + + +class CaptureTransport(Transport): + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.events: List[Any] = [] + self.envelopes: List[Any] = [] + + def capture_event(self, event: Any) -> None: + self.events.append(event) + return None + + def capture_envelope(self, envelope: Any) -> None: + self.envelopes.append(envelope) + return None + + def flush(self, timeout: float, callback: Optional[Any] = None) -> None: + """Flush is called when SDK shuts down.""" + pass + + +@pytest.fixture +def temp_fixture_dir(tmp_path: Any) -> Any: + fixture_dir = tmp_path / "fixtures" + fixture_dir.mkdir() + return fixture_dir + + +@pytest.fixture(autouse=True) +def reset_metrics_backend() -> Generator[None, None, None]: + """Reset the global metrics backend between tests.""" + from sentry_streams import metrics + + try: + from arroyo.utils import metrics as arroyo_metrics + + has_arroyo = True + except ImportError: + has_arroyo = False + + # Reset before each test + metrics.metrics._metrics_backend = None + if has_arroyo: + arroyo_metrics._metrics_backend = None + + yield + + # Reset to None after each test + metrics.metrics._metrics_backend = None + if has_arroyo: + arroyo_metrics._metrics_backend = None + + +@pytest.fixture +def platform_transport() -> CaptureTransport: + transport = CaptureTransport() + # Clear any existing Sentry client + sentry_sdk.get_client().close() + return transport + + +def test_multiprocess_pipe_communication_success( + platform_transport: CaptureTransport, temp_fixture_dir: Any +) -> None: + sentry_sdk.init( + dsn="https://platform@example.com/456", + transport=platform_transport, + ) + + app_file = temp_fixture_dir / "simple_app.py" + app_file.write_text( + """ +from sentry_streams.pipeline import streaming_source +pipeline = streaming_source(name="test", stream_name="test-stream") +""" + ) + + with ( + patch("sentry_streams.runner.load_adapter") as mock_load_adapter, + patch("sentry_streams.runner.iterate_edges") as mock_iterate_edges, + ): + mock_runtime = type( + "MockRuntime", + (), + { + "run": lambda self: None, + "source": lambda self, step: "mock_stream", + "complex_step_override": lambda self: {}, + }, + )() + mock_load_adapter.return_value = mock_runtime + + runtime = load_runtime( + name="test", + log_level="INFO", + adapter="arroyo", + segment_id=None, + application=str(app_file), + environment_config={"metrics": {"type": "dummy"}}, + ) + + assert runtime is not None + + mock_iterate_edges.assert_called_once() + pipeline_arg = mock_iterate_edges.call_args[0][0] # First positional argument + assert isinstance(pipeline_arg, Pipeline) + + +def test_subprocess_sends_error_status_with_details( + platform_transport: CaptureTransport, temp_fixture_dir: Any +) -> None: + """Test that detailed error messages are captured when subprocess sends status='error'.""" + sentry_sdk.init( + dsn="https://platform@example.com/456", + transport=platform_transport, + ) + + # Create an app file that doesn't define 'pipeline' variable + app_file = temp_fixture_dir / "missing_pipeline.py" + app_file.write_text( + """ +from sentry_streams.pipeline import streaming_source +# Intentionally not defining 'pipeline' variable +my_pipeline = streaming_source(name="test", stream_name="test-stream") +""" + ) + + with pytest.raises(ValueError) as exc_info: + load_runtime( + name="test", + log_level="INFO", + adapter="arroyo", + segment_id=None, + application=str(app_file), + environment_config={"metrics": {"type": "dummy"}}, + ) + + assert "Application file must define a 'pipeline' variable" in str(exc_info.value) diff --git a/sentry_streams/tests/test_runner.py b/sentry_streams/tests/test_runner.py index ce0c0807..7be76c41 100644 --- a/sentry_streams/tests/test_runner.py +++ b/sentry_streams/tests/test_runner.py @@ -8,7 +8,6 @@ from sentry_streams.dummy.dummy_adapter import DummyAdapter from sentry_streams.pipeline import Filter, Map, branch, streaming_source from sentry_streams.pipeline.pipeline import ( - DevNullSink, Pipeline, ) from sentry_streams.runner import iterate_edges @@ -28,20 +27,16 @@ def create_pipeline() -> Pipeline[bytes]: "router1", routing_function=lambda x: RouterBranch.BRANCH1.value, routing_table={ - RouterBranch.BRANCH1.value: branch("map4_segment") - .apply(Map("map4", function=lambda x: x.payload)) - .sink(DevNullSink("sink_map4")), - RouterBranch.BRANCH2.value: branch("map5_segment") - .apply(Map("map5", function=lambda x: x.payload)) - .sink(DevNullSink("sink_map5")), + RouterBranch.BRANCH1.value: branch("map4_segment").apply( + Map("map4", function=lambda x: x.payload) + ), + RouterBranch.BRANCH2.value: branch("map5_segment").apply( + Map("map5", function=lambda x: x.payload) + ), }, ) ) - broadcast_branch_2 = ( - branch("branch2") - .apply(Map("map3", function=lambda x: x.payload)) - .sink(DevNullSink("sink_map3")) - ) + broadcast_branch_2 = branch("branch2").apply(Map("map3", function=lambda x: x.payload)) test_pipeline = ( streaming_source("source1", stream_name="foo") @@ -72,11 +67,8 @@ def test_iterate_edges(create_pipeline: Pipeline[bytes]) -> None: "map2", "map3", "router1", - "sink_map3", "map4", "map5", - "sink_map4", - "sink_map5", ] assert runtime.branches == [ "branch1", From 3f12aa627d68d27e0bc7f25ee3dd2f17094e80e8 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Thu, 29 Jan 2026 17:51:08 -0500 Subject: [PATCH 02/15] revert wrong commits --- sentry_streams/tests/pipeline/test_chain.py | 3 +- .../tests/pipeline/test_validation.py | 68 +++++++++++++++++++ sentry_streams/tests/test_runner.py | 22 ++++-- 3 files changed, 85 insertions(+), 8 deletions(-) create mode 100644 sentry_streams/tests/pipeline/test_validation.py diff --git a/sentry_streams/tests/pipeline/test_chain.py b/sentry_streams/tests/pipeline/test_chain.py index 4c8a8468..6a5a7c74 100644 --- a/sentry_streams/tests/pipeline/test_chain.py +++ b/sentry_streams/tests/pipeline/test_chain.py @@ -7,6 +7,7 @@ from sentry_streams.pipeline.pipeline import ( Batch, ComplexStep, + DevNullSink, Filter, FlatMap, Map, @@ -181,7 +182,7 @@ def test_router() -> None: def test_register_steps(step: Union[Transform[Any, Any], ComplexStep[Any, Any]]) -> None: name = step.name pipeline = Pipeline(StreamSource(name="mysource", stream_name="name")) - pipeline.apply(step) + pipeline.apply(step).sink(DevNullSink("test_sink")) assert pipeline.steps[name] == step assert pipeline.steps[name].name == name assert pipeline.incoming_edges[name] == ["mysource"] diff --git a/sentry_streams/tests/pipeline/test_validation.py b/sentry_streams/tests/pipeline/test_validation.py new file mode 100644 index 00000000..47456682 --- /dev/null +++ b/sentry_streams/tests/pipeline/test_validation.py @@ -0,0 +1,68 @@ +from typing import Any + +import pytest + +from sentry_streams.pipeline.exception import InvalidPipelineError +from sentry_streams.pipeline.pipeline import ( + Map, + StreamSink, + branch, + streaming_source, +) +from sentry_streams.pipeline.validation import validate_all_branches_have_sinks + + +def test_valid_pipeline_with_router() -> None: + """Test that a pipeline with router and all branches having sinks passes validation.""" + + def routing_func(msg: Any) -> str: + return "route1" + + pipeline = ( + streaming_source("myinput", "events") + .apply(Map("transform1", lambda msg: msg)) + .route( + "route_to_one", + routing_function=routing_func, + routing_table={ + "route1": branch("route1") + .apply(Map("transform2", lambda msg: msg)) + .sink(StreamSink("myoutput1", stream_name="transformed-events-2")), + "route2": branch("route2") + .apply(Map("transform3", lambda msg: msg)) + .sink(StreamSink("myoutput2", stream_name="transformed-events-3")), + }, + ) + ) + + # Should not raise + validate_all_branches_have_sinks(pipeline) + + +def test_invalid_pipeline_with_router_missing_sink() -> None: + """Test that a pipeline with router where one branch is missing a sink fails validation.""" + + def routing_func(msg: Any) -> str: + return "route1" + + pipeline = ( + streaming_source("myinput", "events") + .apply(Map("transform1", lambda msg: msg)) + .route( + "route_to_one", + routing_function=routing_func, + routing_table={ + "route1": branch("route1") + .apply(Map("transform2", lambda msg: msg)) + .sink(StreamSink("myoutput1", stream_name="transformed-events-2")), + "route2": branch("route2").apply(Map("transform3", lambda msg: msg)), + # Missing sink on route2 + }, + ) + ) + + with pytest.raises(InvalidPipelineError) as exc_info: + validate_all_branches_have_sinks(pipeline) + + assert "transform3" in str(exc_info.value) + assert "must terminate with a Sink step" in str(exc_info.value) diff --git a/sentry_streams/tests/test_runner.py b/sentry_streams/tests/test_runner.py index 7be76c41..ce0c0807 100644 --- a/sentry_streams/tests/test_runner.py +++ b/sentry_streams/tests/test_runner.py @@ -8,6 +8,7 @@ from sentry_streams.dummy.dummy_adapter import DummyAdapter from sentry_streams.pipeline import Filter, Map, branch, streaming_source from sentry_streams.pipeline.pipeline import ( + DevNullSink, Pipeline, ) from sentry_streams.runner import iterate_edges @@ -27,16 +28,20 @@ def create_pipeline() -> Pipeline[bytes]: "router1", routing_function=lambda x: RouterBranch.BRANCH1.value, routing_table={ - RouterBranch.BRANCH1.value: branch("map4_segment").apply( - Map("map4", function=lambda x: x.payload) - ), - RouterBranch.BRANCH2.value: branch("map5_segment").apply( - Map("map5", function=lambda x: x.payload) - ), + RouterBranch.BRANCH1.value: branch("map4_segment") + .apply(Map("map4", function=lambda x: x.payload)) + .sink(DevNullSink("sink_map4")), + RouterBranch.BRANCH2.value: branch("map5_segment") + .apply(Map("map5", function=lambda x: x.payload)) + .sink(DevNullSink("sink_map5")), }, ) ) - broadcast_branch_2 = branch("branch2").apply(Map("map3", function=lambda x: x.payload)) + broadcast_branch_2 = ( + branch("branch2") + .apply(Map("map3", function=lambda x: x.payload)) + .sink(DevNullSink("sink_map3")) + ) test_pipeline = ( streaming_source("source1", stream_name="foo") @@ -67,8 +72,11 @@ def test_iterate_edges(create_pipeline: Pipeline[bytes]) -> None: "map2", "map3", "router1", + "sink_map3", "map4", "map5", + "sink_map4", + "sink_map5", ] assert runtime.branches == [ "branch1", From 66d6b4390767848516a4ae2a3305e6f314c89568 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Thu, 29 Jan 2026 17:54:59 -0500 Subject: [PATCH 03/15] add sink to test pipeline --- sentry_streams/tests/test_load_runtime.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index 2792d3ac..eab25ac5 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -80,8 +80,8 @@ def test_multiprocess_pipe_communication_success( app_file = temp_fixture_dir / "simple_app.py" app_file.write_text( """ -from sentry_streams.pipeline import streaming_source -pipeline = streaming_source(name="test", stream_name="test-stream") +from sentry_streams.pipeline import streaming_source, StreamSink +pipeline = streaming_source(name="test", stream_name="test-stream").sink(StreamSink("test-sink", stream_name="test-output")) """ ) From af519f1fb19748eb3f141abf7a4eba744ff5e4e3 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 30 Jan 2026 15:10:00 -0500 Subject: [PATCH 04/15] only change rust runner arg names --- sentry_streams/src/run.rs | 43 ++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/sentry_streams/src/run.rs b/sentry_streams/src/run.rs index 7ad853e2..ddb788b3 100644 --- a/sentry_streams/src/run.rs +++ b/sentry_streams/src/run.rs @@ -33,27 +33,21 @@ pub struct PyConfig { #[derive(Parser, Debug)] pub struct RuntimeConfig { - /// The name of the Sentry Streams application #[arg(short, long)] pub name: String, - /// The logging level #[arg(short, long)] pub log_level: String, - /// The stream adapter to use (e.g., "arroyo", "rust_arroyo") #[arg(short, long)] pub adapter: String, - /// The deployment config file path. Each config file currently corresponds to a specific pipeline. #[arg(short, long)] pub config: OsString, - /// The segment id to run the pipeline for #[arg(short, long)] pub segment_id: Option, - /// The application file path pub application: String, } @@ -63,19 +57,6 @@ pub fn run(args: Args) -> Result<(), Box> { runtime_config, } = args; - // Set PYTHONPATH environment variable if module_paths are provided - // This ensures subprocesses spawned via multiprocessing inherit the paths - if let Some(ref module_paths) = py_config.module_paths { - let separator = if cfg!(windows) { ";" } else { ":" }; - let path_string = module_paths - .iter() - .filter_map(|p| p.to_str()) - .collect::>() - .join(separator); - - std::env::set_var("PYTHONPATH", path_string); - } - traced_with_gil!(|py| -> PyResult<()> { if let Some(exec_path) = py_config.exec_path { PyModule::import(py, "sys")?.setattr("executable", exec_path)?; @@ -83,21 +64,31 @@ pub fn run(args: Args) -> Result<(), Box> { if let Some(module_paths) = py_config.module_paths { PyModule::import(py, "sys")?.setattr("path", module_paths)?; } - py.import("sentry_streams.runner")? + Ok(()) + })?; + + let runtime: Py = traced_with_gil!(|py| { + let runtime = py + .import("sentry_streams.runner")? .getattr("run_with_config_file")? .call1(( runtime_config.name, runtime_config.log_level, runtime_config.adapter, - runtime_config.config.to_str().ok_or_else(|| { - pyo3::exceptions::PyValueError::new_err("Invalid config file path") - })?, + runtime_config.config, runtime_config.segment_id, runtime_config.application, - ))?; - - Ok(()) + ))? + .unbind(); + PyResult::Ok(runtime) })?; + traced_with_gil!(|py| { + runtime + .bind(py) + .call_method0("run") + .expect("Unable to start runtime"); + }); + Ok(()) } From 1bc38a8d79afe96203125e0542e0f284dd5d0d97 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 30 Jan 2026 18:49:41 -0500 Subject: [PATCH 05/15] use transport in test --- sentry_streams/sentry_streams/runner.py | 9 ++- sentry_streams/tests/test_load_runtime.py | 69 +++++++++++++++++------ 2 files changed, 59 insertions(+), 19 deletions(-) diff --git a/sentry_streams/sentry_streams/runner.py b/sentry_streams/sentry_streams/runner.py index f3e323e8..8efb8538 100644 --- a/sentry_streams/sentry_streams/runner.py +++ b/sentry_streams/sentry_streams/runner.py @@ -100,9 +100,12 @@ def load_runtime( format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) - with multiprocessing.Pool(processes=1) as pool: - pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,)) - logger.info("Successfully loaded pipeline from subprocess") + try: + with multiprocessing.Pool(processes=1) as pool: + pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,)) + logger.info("Successfully loaded pipeline from subprocess") + except Exception: + raise validate_all_branches_have_sinks(pipeline) metric_config = environment_config.get("metrics", {}) diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index eab25ac5..778ada03 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -6,7 +6,7 @@ from sentry_sdk.transport import Transport from sentry_streams.pipeline.pipeline import Pipeline -from sentry_streams.runner import load_runtime +from sentry_streams.runner import load_runtime, run_with_config_file class CaptureTransport(Transport): @@ -120,29 +120,66 @@ def test_subprocess_sends_error_status_with_details( platform_transport: CaptureTransport, temp_fixture_dir: Any ) -> None: """Test that detailed error messages are captured when subprocess sends status='error'.""" - sentry_sdk.init( - dsn="https://platform@example.com/456", - transport=platform_transport, - ) - # Create an app file that doesn't define 'pipeline' variable app_file = temp_fixture_dir / "missing_pipeline.py" app_file.write_text( """ +import sentry_sdk + +# Initialize customer's Sentry SDK in the subprocess +sentry_sdk.init(dsn="https://customer@example.com/123") + from sentry_streams.pipeline import streaming_source # Intentionally not defining 'pipeline' variable my_pipeline = streaming_source(name="test", stream_name="test-stream") """ ) - with pytest.raises(ValueError) as exc_info: - load_runtime( - name="test", - log_level="INFO", - adapter="arroyo", - segment_id=None, - application=str(app_file), - environment_config={"metrics": {"type": "dummy"}}, - ) + config_file = temp_fixture_dir / "config.yaml" + config_file.write_text( + """ +streaming_platform_config: + dsn: "https://platform@example.com/456" +metrics: + type: dummy +""" + ) - assert "Application file must define a 'pipeline' variable" in str(exc_info.value) + # Patch sentry_sdk.init to use our custom transport + original_init = sentry_sdk.init + error_raised = False + + def custom_init(**kwargs: Any) -> None: + kwargs["transport"] = platform_transport + original_init(**kwargs) + + with patch("sentry_streams.runner.sentry_sdk.init", side_effect=custom_init): + try: + run_with_config_file( + name="test", + log_level="INFO", + adapter="arroyo", + config=str(config_file), + segment_id=None, + application=str(app_file), + ) + except ValueError as e: + error_raised = True + sentry_sdk.capture_exception(e) + sentry_sdk.flush() + assert "Application file must define a 'pipeline' variable" in str(e) + + assert error_raised, "Expected intentiaonl ValueError to be raised" + + assert len(platform_transport.envelopes) > 0, "Error should be captured in platform_transport" + + envelope = platform_transport.envelopes[0] + items = envelope.items + assert len(items) > 0, "Envelope should contain at least one item" + + event_item = items[0] + error_event = event_item.payload.json + + assert "exception" in error_event + error_message = str(error_event["exception"]["values"][0]["value"]) + assert "Application file must define a 'pipeline' variable" in error_message From cb48162515a459fe52304a83f005b2a48ad6aaad Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 30 Jan 2026 18:52:38 -0500 Subject: [PATCH 06/15] rename to sentry_sdk_config --- sentry_streams/sentry_streams/config.json | 24 +++++++++++++++++------ sentry_streams/sentry_streams/runner.py | 6 +++--- sentry_streams/tests/test_load_runtime.py | 2 +- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/sentry_streams/sentry_streams/config.json b/sentry_streams/sentry_streams/config.json index 63ab043f..72c2cef8 100644 --- a/sentry_streams/sentry_streams/config.json +++ b/sentry_streams/sentry_streams/config.json @@ -14,7 +14,7 @@ "metrics": { "$ref": "#/definitions/Metrics" }, - "streaming_platform_config": { + "sentry_sdk_config": { "$ref": "#/definitions/StreamingPlatformConfig" } } @@ -52,7 +52,9 @@ "properties": { "type": { "type": "string", - "enum": ["datadog"] + "enum": [ + "datadog" + ] }, "host": { "type": "string" @@ -65,17 +67,25 @@ "additionalProperties": true } }, - "required": ["type", "host", "port"] + "required": [ + "type", + "host", + "port" + ] }, { "type": "object", "properties": { "type": { "type": "string", - "enum": ["dummy"] + "enum": [ + "dummy" + ] } }, - "required": ["type"] + "required": [ + "type" + ] } ] }, @@ -86,7 +96,9 @@ "type": "string" } }, - "required": ["dsn"] + "required": [ + "dsn" + ] } } } diff --git a/sentry_streams/sentry_streams/runner.py b/sentry_streams/sentry_streams/runner.py index 8efb8538..110380e2 100644 --- a/sentry_streams/sentry_streams/runner.py +++ b/sentry_streams/sentry_streams/runner.py @@ -158,9 +158,9 @@ def run_with_config_file( except Exception: raise - streaming_platform_config = environment_config.get("streaming_platform_config") - if streaming_platform_config: - sentry_sdk.init(dsn=streaming_platform_config["dsn"]) + sentry_sdk_config = environment_config.get("sentry_sdk_config") + if sentry_sdk_config: + sentry_sdk.init(dsn=sentry_sdk_config["dsn"]) runtime = load_runtime(name, log_level, adapter, segment_id, application, environment_config) runtime.run() diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index 778ada03..583353c4 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -138,7 +138,7 @@ def test_subprocess_sends_error_status_with_details( config_file = temp_fixture_dir / "config.yaml" config_file.write_text( """ -streaming_platform_config: +sentry_sdk_config: dsn: "https://platform@example.com/456" metrics: type: dummy From 5c66cadf76153666185e41974d8348664450144f Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 30 Jan 2026 19:04:07 -0500 Subject: [PATCH 07/15] use dummyadapter --- sentry_streams/tests/test_load_runtime.py | 53 +++++++++-------------- 1 file changed, 21 insertions(+), 32 deletions(-) diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index 583353c4..11ea1795 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -5,7 +5,6 @@ import sentry_sdk from sentry_sdk.transport import Transport -from sentry_streams.pipeline.pipeline import Pipeline from sentry_streams.runner import load_runtime, run_with_config_file @@ -80,40 +79,30 @@ def test_multiprocess_pipe_communication_success( app_file = temp_fixture_dir / "simple_app.py" app_file.write_text( """ -from sentry_streams.pipeline import streaming_source, StreamSink -pipeline = streaming_source(name="test", stream_name="test-stream").sink(StreamSink("test-sink", stream_name="test-output")) +from sentry_streams.pipeline import streaming_source +from sentry_streams.pipeline.pipeline import DevNullSink +pipeline = streaming_source(name="test", stream_name="test-stream").sink(DevNullSink("test-sink")) """ ) - with ( - patch("sentry_streams.runner.load_adapter") as mock_load_adapter, - patch("sentry_streams.runner.iterate_edges") as mock_iterate_edges, - ): - mock_runtime = type( - "MockRuntime", - (), - { - "run": lambda self: None, - "source": lambda self, step: "mock_stream", - "complex_step_override": lambda self: {}, - }, - )() - mock_load_adapter.return_value = mock_runtime - - runtime = load_runtime( - name="test", - log_level="INFO", - adapter="arroyo", - segment_id=None, - application=str(app_file), - environment_config={"metrics": {"type": "dummy"}}, - ) - - assert runtime is not None - - mock_iterate_edges.assert_called_once() - pipeline_arg = mock_iterate_edges.call_args[0][0] # First positional argument - assert isinstance(pipeline_arg, Pipeline) + runtime = load_runtime( + name="test", + log_level="INFO", + adapter="dummy", + segment_id=None, + application=str(app_file), + environment_config={"metrics": {"type": "dummy"}}, + ) + + assert runtime is not None + + # Verify that the pipeline was loaded and edges were iterated + # The dummy adapter tracks input streams + from sentry_streams.dummy.dummy_adapter import DummyAdapter + + assert isinstance(runtime, DummyAdapter) + assert "test" in runtime.input_streams + assert "test-sink" in runtime.input_streams def test_subprocess_sends_error_status_with_details( From deb4dae7bde2039f062aaec5476a8c2a13863ead Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 30 Jan 2026 19:05:44 -0500 Subject: [PATCH 08/15] remove unused --- sentry_streams/tests/test_load_runtime.py | 27 +---------------------- 1 file changed, 1 insertion(+), 26 deletions(-) diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index 11ea1795..893c5011 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -1,4 +1,4 @@ -from typing import Any, Generator, List, Optional +from typing import Any, List, Optional from unittest.mock import patch import pytest @@ -35,31 +35,6 @@ def temp_fixture_dir(tmp_path: Any) -> Any: return fixture_dir -@pytest.fixture(autouse=True) -def reset_metrics_backend() -> Generator[None, None, None]: - """Reset the global metrics backend between tests.""" - from sentry_streams import metrics - - try: - from arroyo.utils import metrics as arroyo_metrics - - has_arroyo = True - except ImportError: - has_arroyo = False - - # Reset before each test - metrics.metrics._metrics_backend = None - if has_arroyo: - arroyo_metrics._metrics_backend = None - - yield - - # Reset to None after each test - metrics.metrics._metrics_backend = None - if has_arroyo: - arroyo_metrics._metrics_backend = None - - @pytest.fixture def platform_transport() -> CaptureTransport: transport = CaptureTransport() From f9f08378e5494e97389e4283a3c213d98734d594 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 30 Jan 2026 19:11:41 -0500 Subject: [PATCH 09/15] use real modules --- .../tests/fixtures/missing_pipeline.py | 9 +++++++ sentry_streams/tests/fixtures/simple_app.py | 4 +++ sentry_streams/tests/test_load_runtime.py | 27 +++++-------------- 3 files changed, 19 insertions(+), 21 deletions(-) create mode 100644 sentry_streams/tests/fixtures/missing_pipeline.py create mode 100644 sentry_streams/tests/fixtures/simple_app.py diff --git a/sentry_streams/tests/fixtures/missing_pipeline.py b/sentry_streams/tests/fixtures/missing_pipeline.py new file mode 100644 index 00000000..6489bba1 --- /dev/null +++ b/sentry_streams/tests/fixtures/missing_pipeline.py @@ -0,0 +1,9 @@ +import sentry_sdk + +# Initialize customer's Sentry SDK in the subprocess +sentry_sdk.init(dsn="https://customer@example.com/123") + +from sentry_streams.pipeline import streaming_source + +# Intentionally not defining 'pipeline' variable +my_pipeline = streaming_source(name="test", stream_name="test-stream") diff --git a/sentry_streams/tests/fixtures/simple_app.py b/sentry_streams/tests/fixtures/simple_app.py new file mode 100644 index 00000000..6ac6b302 --- /dev/null +++ b/sentry_streams/tests/fixtures/simple_app.py @@ -0,0 +1,4 @@ +from sentry_streams.pipeline import streaming_source +from sentry_streams.pipeline.pipeline import DevNullSink + +pipeline = streaming_source(name="test", stream_name="test-stream").sink(DevNullSink("test-sink")) diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index 893c5011..f9aa9225 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -1,3 +1,4 @@ +from pathlib import Path from typing import Any, List, Optional from unittest.mock import patch @@ -7,6 +8,9 @@ from sentry_streams.runner import load_runtime, run_with_config_file +# Path to fixtures directory +FIXTURES_DIR = Path(__file__).parent / "fixtures" + class CaptureTransport(Transport): @@ -51,14 +55,7 @@ def test_multiprocess_pipe_communication_success( transport=platform_transport, ) - app_file = temp_fixture_dir / "simple_app.py" - app_file.write_text( - """ -from sentry_streams.pipeline import streaming_source -from sentry_streams.pipeline.pipeline import DevNullSink -pipeline = streaming_source(name="test", stream_name="test-stream").sink(DevNullSink("test-sink")) -""" - ) + app_file = FIXTURES_DIR / "simple_app.py" runtime = load_runtime( name="test", @@ -85,19 +82,7 @@ def test_subprocess_sends_error_status_with_details( ) -> None: """Test that detailed error messages are captured when subprocess sends status='error'.""" - app_file = temp_fixture_dir / "missing_pipeline.py" - app_file.write_text( - """ -import sentry_sdk - -# Initialize customer's Sentry SDK in the subprocess -sentry_sdk.init(dsn="https://customer@example.com/123") - -from sentry_streams.pipeline import streaming_source -# Intentionally not defining 'pipeline' variable -my_pipeline = streaming_source(name="test", stream_name="test-stream") -""" - ) + app_file = FIXTURES_DIR / "missing_pipeline.py" config_file = temp_fixture_dir / "config.yaml" config_file.write_text( From feee6e39fd4885a80ffe2784bb2cd3ebf1eab7ca Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 30 Jan 2026 19:58:00 -0500 Subject: [PATCH 10/15] clean up clis --- sentry_streams/sentry_streams/runner.py | 33 +++++++++++++++++++---- sentry_streams/src/run.rs | 2 +- sentry_streams/tests/test_load_runtime.py | 26 +++--------------- 3 files changed, 32 insertions(+), 29 deletions(-) diff --git a/sentry_streams/sentry_streams/runner.py b/sentry_streams/sentry_streams/runner.py index 110380e2..0c82663d 100644 --- a/sentry_streams/sentry_streams/runner.py +++ b/sentry_streams/sentry_streams/runner.py @@ -138,14 +138,15 @@ def load_runtime( return runtime -def run_with_config_file( +def load_runtime_with_config_file( name: str, log_level: str, adapter: str, config: str, segment_id: Optional[str], application: str, -) -> None: +) -> Any: + """Load runtime from a config file path, returning the runtime object without calling run().""" with open(config, "r") as f: environment_config = yaml.safe_load(f) @@ -162,7 +163,29 @@ def run_with_config_file( if sentry_sdk_config: sentry_sdk.init(dsn=sentry_sdk_config["dsn"]) - runtime = load_runtime(name, log_level, adapter, segment_id, application, environment_config) + return load_runtime(name, log_level, adapter, segment_id, application, environment_config) + + +def run_with_config_file( + name: str, + log_level: str, + adapter: str, + config: str, + segment_id: Optional[str], + application: str, +) -> None: + """ + Load runtime from config file and run it. Used by the Python CLI. + + NOTE: This function is separate from load_runtime_with_config_file() for a reason: + - load_runtime_with_config_file() returns the runtime WITHOUT calling .run() + - This allows the Rust CLI (run.rs) to call .run() itself + - Do NOT combine these functions - it would break the Rust CLI which needs to + control when .run() is called + """ + runtime = load_runtime_with_config_file( + name, log_level, adapter, config, segment_id, application + ) runtime.run() @@ -213,7 +236,7 @@ def run_with_config_file( "application", required=True, ) -def run_with_cli( +def main( name: str, log_level: str, adapter: str, @@ -225,4 +248,4 @@ def run_with_cli( if __name__ == "__main__": - run_with_cli() + main() diff --git a/sentry_streams/src/run.rs b/sentry_streams/src/run.rs index ddb788b3..85031758 100644 --- a/sentry_streams/src/run.rs +++ b/sentry_streams/src/run.rs @@ -70,7 +70,7 @@ pub fn run(args: Args) -> Result<(), Box> { let runtime: Py = traced_with_gil!(|py| { let runtime = py .import("sentry_streams.runner")? - .getattr("run_with_config_file")? + .getattr("load_runtime_with_config_file")? .call1(( runtime_config.name, runtime_config.log_level, diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index f9aa9225..e35e1fed 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -32,13 +32,6 @@ def flush(self, timeout: float, callback: Optional[Any] = None) -> None: pass -@pytest.fixture -def temp_fixture_dir(tmp_path: Any) -> Any: - fixture_dir = tmp_path / "fixtures" - fixture_dir.mkdir() - return fixture_dir - - @pytest.fixture def platform_transport() -> CaptureTransport: transport = CaptureTransport() @@ -47,9 +40,7 @@ def platform_transport() -> CaptureTransport: return transport -def test_multiprocess_pipe_communication_success( - platform_transport: CaptureTransport, temp_fixture_dir: Any -) -> None: +def test_multiprocess_pipe_communication_success(platform_transport: CaptureTransport) -> None: sentry_sdk.init( dsn="https://platform@example.com/456", transport=platform_transport, @@ -77,22 +68,11 @@ def test_multiprocess_pipe_communication_success( assert "test-sink" in runtime.input_streams -def test_subprocess_sends_error_status_with_details( - platform_transport: CaptureTransport, temp_fixture_dir: Any -) -> None: +def test_subprocess_sends_error_status_with_details(platform_transport: CaptureTransport) -> None: """Test that detailed error messages are captured when subprocess sends status='error'.""" app_file = FIXTURES_DIR / "missing_pipeline.py" - - config_file = temp_fixture_dir / "config.yaml" - config_file.write_text( - """ -sentry_sdk_config: - dsn: "https://platform@example.com/456" -metrics: - type: dummy -""" - ) + config_file = FIXTURES_DIR / "test_config.yaml" # Patch sentry_sdk.init to use our custom transport original_init = sentry_sdk.init From 98b300ea49be597af72d17acb8fe55dea3e8ba64 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 30 Jan 2026 20:26:02 -0500 Subject: [PATCH 11/15] add file --- sentry_streams/tests/fixtures/test_config.yaml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 sentry_streams/tests/fixtures/test_config.yaml diff --git a/sentry_streams/tests/fixtures/test_config.yaml b/sentry_streams/tests/fixtures/test_config.yaml new file mode 100644 index 00000000..dfc18ab6 --- /dev/null +++ b/sentry_streams/tests/fixtures/test_config.yaml @@ -0,0 +1,4 @@ +sentry_sdk_config: + dsn: "https://platform@example.com/456" +metrics: + type: dummy From 6bf947c96e283c684c3631faa17c243bbea214a0 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 30 Jan 2026 20:36:52 -0500 Subject: [PATCH 12/15] add metrics teardown --- sentry_streams/tests/test_load_runtime.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index e35e1fed..c42cb3fb 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import Any, List, Optional +from typing import Any, Generator, List, Optional from unittest.mock import patch import pytest @@ -40,6 +40,24 @@ def platform_transport() -> CaptureTransport: return transport +@pytest.fixture(autouse=True) +def reset_metrics_backend() -> Generator: + """Reset metrics backend before and after each test. + + This fixture prevents "Metrics is already set" errors when tests run in CI. + The metrics backend uses a global module-level variable that persists across + tests in the same pytest session. Since load_runtime() calls configure_metrics(), + we need to reset the global state to ensure test isolation. + """ + import sentry_streams.metrics.metrics + + # Reset before test runs (setup) + sentry_streams.metrics.metrics._metrics_backend = None + yield + # Reset after test completes (teardown) + sentry_streams.metrics.metrics._metrics_backend = None + + def test_multiprocess_pipe_communication_success(platform_transport: CaptureTransport) -> None: sentry_sdk.init( dsn="https://platform@example.com/456", From 24a9d0acada69f309afc4e2c4c1c0a7d91d62124 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 30 Jan 2026 20:47:39 -0500 Subject: [PATCH 13/15] also teardown arroyo backend --- sentry_streams/tests/test_load_runtime.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index c42cb3fb..532ae609 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -49,13 +49,17 @@ def reset_metrics_backend() -> Generator: tests in the same pytest session. Since load_runtime() calls configure_metrics(), we need to reset the global state to ensure test isolation. """ + import arroyo.utils.metrics + import sentry_streams.metrics.metrics # Reset before test runs (setup) sentry_streams.metrics.metrics._metrics_backend = None + arroyo.utils.metrics._metrics_backend = None yield # Reset after test completes (teardown) sentry_streams.metrics.metrics._metrics_backend = None + arroyo.utils.metrics._metrics_backend = None def test_multiprocess_pipe_communication_success(platform_transport: CaptureTransport) -> None: From 1fa578d4c5b87db542880fe3de3468b50bf85795 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 30 Jan 2026 20:55:54 -0500 Subject: [PATCH 14/15] type checking --- sentry_streams/tests/test_load_runtime.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index 532ae609..36724897 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -41,7 +41,7 @@ def platform_transport() -> CaptureTransport: @pytest.fixture(autouse=True) -def reset_metrics_backend() -> Generator: +def reset_metrics_backend() -> Generator[None, None, None]: """Reset metrics backend before and after each test. This fixture prevents "Metrics is already set" errors when tests run in CI. From eabbfc3f081f509e8a2811bab57d66793ec2c552 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 30 Jan 2026 21:08:22 -0500 Subject: [PATCH 15/15] remove redundant try --- sentry_streams/sentry_streams/runner.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/sentry_streams/sentry_streams/runner.py b/sentry_streams/sentry_streams/runner.py index 0c82663d..db354689 100644 --- a/sentry_streams/sentry_streams/runner.py +++ b/sentry_streams/sentry_streams/runner.py @@ -100,12 +100,9 @@ def load_runtime( format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) - try: - with multiprocessing.Pool(processes=1) as pool: - pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,)) - logger.info("Successfully loaded pipeline from subprocess") - except Exception: - raise + with multiprocessing.Pool(processes=1) as pool: + pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,)) + validate_all_branches_have_sinks(pipeline) metric_config = environment_config.get("metrics", {}) @@ -154,10 +151,7 @@ def load_runtime_with_config_file( with config_template.open("r") as file: schema = json.load(file) - try: - jsonschema.validate(environment_config, schema) - except Exception: - raise + jsonschema.validate(environment_config, schema) sentry_sdk_config = environment_config.get("sentry_sdk_config") if sentry_sdk_config: