Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions sentry_streams/sentry_streams/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
},
"metrics": {
"$ref": "#/definitions/Metrics"
},
"sentry_sdk_config": {
"$ref": "#/definitions/StreamingPlatformConfig"
}
}
},
Expand Down Expand Up @@ -49,7 +52,9 @@
"properties": {
"type": {
"type": "string",
"enum": ["datadog"]
"enum": [
"datadog"
]
},
"host": {
"type": "string"
Expand All @@ -62,18 +67,37 @@
"additionalProperties": true
}
},
"required": ["type", "host", "port"]
"required": [
"type",
"host",
"port"
]
},
{
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": ["dummy"]
"enum": [
"dummy"
]
}
},
"required": ["type"]
"required": [
"type"
]
}
]
},
"StreamingPlatformConfig": {
"type": "object",
"properties": {
"dsn": {
"type": "string"
}
},
"required": [
"dsn"
]
}
}
Expand Down
102 changes: 80 additions & 22 deletions sentry_streams/sentry_streams/runner.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import importlib
import json
import logging
from typing import Any, Optional, cast
import multiprocessing
import sys
from typing import Any, Mapping, Optional, cast

import click
import jsonschema
import sentry_sdk
import yaml

from sentry_streams.adapters.loader import load_adapter
Expand Down Expand Up @@ -60,37 +63,47 @@ def iterate_edges(
step_streams[branch_name] = next_step_stream[branch_name]


def _load_pipeline(application: str) -> Pipeline[Any]:
"""
Worker function that runs in a separate process to load the pipeline.
Returns the Pipeline object directly, or raises an exception on error.

Customer code exceptions are allowed to propagate naturally so that the customer's
Sentry SDK (if initialized) can capture them.
"""
import contextlib

pipeline_globals: dict[str, Any] = {}

with contextlib.redirect_stdout(sys.stderr):
with open(application, "r") as f:
exec(f.read(), pipeline_globals)

if "pipeline" not in pipeline_globals:
raise ValueError("Application file must define a 'pipeline' variable")

pipeline = cast(Pipeline[Any], pipeline_globals["pipeline"])
return pipeline
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Platform Sentry not initialized in subprocess

Medium Severity

The platform's Sentry SDK is initialized with streaming_platform_config DSN in run_with_config_file (main process) but not in the _load_pipeline subprocess where customer code executes. Exceptions occurring during pipeline loading in the subprocess won't be captured by the platform's Sentry, only propagating as CalledProcessError to the main process. This contradicts the config schema which provides platform-level DSN configuration, suggesting platform Sentry should monitor both processes.

Additional Locations (1)

Fix in Cursor Fix in Web



def load_runtime(
name: str,
log_level: str,
adapter: str,
config: str,
segment_id: Optional[str],
application: str,
environment_config: Mapping[str, Any],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust caller uses incompatible old function signature

High Severity

The load_runtime function signature changed: the config parameter was removed, environment_config: Mapping[str, Any] was added, and parameter order shifted. However, the Rust code in run.rs still calls this function with the old positional arguments (name, log_level, adapter_name, config_file, segment_id, application_name). This causes a type mismatch where config_file (a string path) gets passed where segment_id is expected, and application_name (a string) gets passed where environment_config (a mapping) is expected, breaking the Rust integration entirely.

Fix in Cursor Fix in Web

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust caller uses outdated load_runtime function signature

High Severity

The load_runtime function signature changed from (name, log_level, adapter, config, segment_id, application) to (name, log_level, adapter, segment_id, application, environment_config), but the Rust caller in run.rs was not updated. The Rust code passes config_file where segment_id is now expected, segment_id where application is expected, and application_name (a string) where environment_config (a Mapping) is expected. This will crash when environment_config.get("metrics", {}) is called because strings don't have a .get() method.

Fix in Cursor Fix in Web

) -> Any:

logging.basicConfig(
level=log_level,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
with multiprocessing.Pool(processes=1) as pool:
pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline with lambdas fails subprocess pickling

High Severity

Using multiprocessing.Pool.apply() to load the pipeline in a subprocess requires pickling the Pipeline object to return it to the main process. Python's standard pickle cannot serialize lambda functions. The existing billing.py example uses aggregate_func=lambda: OutcomesBuffer(), and any user pipeline with lambdas in Map, Filter, Reducer, or Router steps will fail with a PicklingError. This is a regression from the previous in-process execution model.

Fix in Cursor Fix in Web


pipeline_globals: dict[str, Any] = {}

with open(application) as f:
exec(f.read(), pipeline_globals)

with open(config, "r") as config_file:
environment_config = yaml.safe_load(config_file)

config_template = importlib.resources.files("sentry_streams") / "config.json"
with config_template.open("r") as file:
schema = json.load(file)

try:
jsonschema.validate(environment_config, schema)
except Exception:
raise
validate_all_branches_have_sinks(pipeline)

metric_config = environment_config.get("metrics", {})
if metric_config.get("type") == "datadog":
Expand All @@ -114,8 +127,6 @@ def load_runtime(
metric_config = {}

assigned_segment_id = int(segment_id) if segment_id else None
pipeline: Pipeline[Any] = pipeline_globals["pipeline"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing KeyError handling for datadog metrics config

Medium Severity

When metric_config.get("type") == "datadog", the code directly accesses metric_config["host"] and metric_config["port"] without validation. If the config is malformed (missing these required keys), a KeyError will be raised. While run_with_config_file validates config via JSON schema, load_runtime is a public function that can be called directly without validation, as seen in tests, making this crash possible when datadog metrics are configured incorrectly.

Additional Locations (1)

Fix in Cursor Fix in Web

validate_all_branches_have_sinks(pipeline)
runtime: Any = load_adapter(adapter, environment_config, assigned_segment_id, metric_config)
translator = RuntimeTranslator(runtime)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a use case we did not consider and is an issue.
The pipeline definition is coupled with the configuration file. If I rename a step in the pipeline definition and the config file references that, the config may become invalid. Though we would know that only at this step. That error would go to streaming.

Though if we changed that and sent config file mistakes to product, the system would be wrong anyway, as streaming owns the config file content.

Do we have a plan to address this issue ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not address this issue yet. The configuration yaml file should have its own validation step, and it's not been built yet. In any case like you said it will be a problem if this step is the first place an invalid deployment config file is erroring out

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpacifici not sure what the course of action here is though, a mismatch between deployment config and pipeline settings could be either team's responsibility. if a team renames its steps and does not update the config file in advance then we should not have to react to that.

I think we should probably try to prevent this issue in CI if it becomes a big one.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably try to prevent this issue in CI if it becomes a big one.

I think we do not haev a precise story for how to manage this use case in a CI/CD environment.
IF I change the application, the change may be breaking with respect to what the config says. The way this works today would require us to switch the configuraiton at the same time the new sha is deployed, which is not something we can really enforce now.

I think this should be discussed in a followup

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Expand All @@ -124,6 +135,54 @@ def load_runtime(
return runtime


def load_runtime_with_config_file(
name: str,
log_level: str,
adapter: str,
config: str,
segment_id: Optional[str],
application: str,
) -> Any:
"""Load runtime from a config file path, returning the runtime object without calling run()."""
with open(config, "r") as f:
environment_config = yaml.safe_load(f)

config_template = importlib.resources.files("sentry_streams") / "config.json"
with config_template.open("r") as file:
schema = json.load(file)

jsonschema.validate(environment_config, schema)

sentry_sdk_config = environment_config.get("sentry_sdk_config")
if sentry_sdk_config:
sentry_sdk.init(dsn=sentry_sdk_config["dsn"])

return load_runtime(name, log_level, adapter, segment_id, application, environment_config)


def run_with_config_file(
name: str,
log_level: str,
adapter: str,
config: str,
segment_id: Optional[str],
application: str,
) -> None:
"""
Load runtime from config file and run it. Used by the Python CLI.

NOTE: This function is separate from load_runtime_with_config_file() for a reason:
- load_runtime_with_config_file() returns the runtime WITHOUT calling .run()
- This allows the Rust CLI (run.rs) to call .run() itself
- Do NOT combine these functions - it would break the Rust CLI which needs to
control when .run() is called
"""
runtime = load_runtime_with_config_file(
name, log_level, adapter, config, segment_id, application
)
runtime.run()


@click.command()
@click.option(
"--name",
Expand Down Expand Up @@ -179,8 +238,7 @@ def main(
segment_id: Optional[str],
application: str,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Click option name mismatches function parameter name

High Severity

The @click.option("--config", ...) decorator maps the CLI option to a parameter named config, but the run_with_config_file function declares this parameter as config_file. When invoked via CLI, Click passes config=value but the function expects config_file, causing a TypeError. The Rust entry point works because it passes positional arguments, but the Python CLI entry point (if __name__ == "__main__") is completely broken.

Additional Locations (1)

Fix in Cursor Fix in Web

) -> None:
runtime = load_runtime(name, log_level, adapter, config, segment_id, application)
runtime.run()
run_with_config_file(name, log_level, adapter, config, segment_id, application)


if __name__ == "__main__":
Expand Down
20 changes: 7 additions & 13 deletions sentry_streams/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,22 @@ pub struct PyConfig {

#[derive(Parser, Debug)]
pub struct RuntimeConfig {
/// The name of the Sentry Streams application
#[arg(short, long)]
pub name: String,

/// The name of the Sentry Streams application
#[arg(short, long)]
pub log_level: String,

/// The name of the adapter
#[arg(short, long)]
pub adapter_name: String,
pub adapter: String,

/// The deployment config file path. Each config file currently corresponds to a specific pipeline.
#[arg(short, long)]
pub config_file: OsString,
pub config: OsString,

/// The segment id to run the pipeline for
#[arg(short, long)]
pub segment_id: Option<String>,

/// The name of the application
pub application_name: String,
pub application: String,
}

pub fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -76,14 +70,14 @@ pub fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> {
let runtime: Py<PyAny> = traced_with_gil!(|py| {
let runtime = py
.import("sentry_streams.runner")?
.getattr("load_runtime")?
.getattr("load_runtime_with_config_file")?
.call1((
runtime_config.name,
runtime_config.log_level,
runtime_config.adapter_name,
runtime_config.config_file,
runtime_config.adapter,
runtime_config.config,
runtime_config.segment_id,
runtime_config.application_name,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust calls .run() on None return value

High Severity

The Rust entry point calls run_with_config_file and then attempts to call .run() on its return value. However, run_with_config_file already calls runtime.run() internally and returns None (its return type is explicitly -> None). This means the Rust code will crash with a PyO3 error when attempting to call .run() on Python's None object after the pipeline completes.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust calls .run() on None return value

High Severity

The Rust entry point calls run_with_config_file and then attempts to call .run() on its return value. However, run_with_config_file already calls runtime.run() internally and returns None (its return type is explicitly -> None). This means the Rust code will crash with a PyO3 error when attempting to call .run() on Python's None object after the pipeline completes.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust entry point calls .run() on None returned value

High Severity

The Rust entry point in run.rs calls run_with_config_file and then attempts to invoke .run() on the returned value. However, run_with_config_file returns None (as indicated by its -> None type annotation) and already calls runtime.run() internally. This causes the Rust code to call .run() on None, which will crash with an AttributeError at runtime, completely breaking the Rust entry point.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust entry point calls .run() on None return value

High Severity

The Rust entry point calls run_with_config_file which returns None (since it has -> None return type and already calls runtime.run() internally), but then the Rust code stores this result and tries to call .run() on it. This will crash with an AttributeError when the Rust binary is used, as NoneType has no run method.

Additional Locations (1)

Fix in Cursor Fix in Web

runtime_config.application,
))?
.unbind();
PyResult::Ok(runtime)
Expand Down
9 changes: 9 additions & 0 deletions sentry_streams/tests/fixtures/missing_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import sentry_sdk

# Initialize customer's Sentry SDK in the subprocess
sentry_sdk.init(dsn="https://customer@example.com/123")

from sentry_streams.pipeline import streaming_source

# Intentionally not defining 'pipeline' variable
my_pipeline = streaming_source(name="test", stream_name="test-stream")
4 changes: 4 additions & 0 deletions sentry_streams/tests/fixtures/simple_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from sentry_streams.pipeline import streaming_source
from sentry_streams.pipeline.pipeline import DevNullSink

pipeline = streaming_source(name="test", stream_name="test-stream").sink(DevNullSink("test-sink"))
4 changes: 4 additions & 0 deletions sentry_streams/tests/fixtures/test_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
sentry_sdk_config:
dsn: "https://platform@example.com/456"
metrics:
type: dummy
Loading