Skip to content

feat(sentry_integration): exec app in subprocess#217

Merged
victoria-yining-huang merged 15 commits intomainfrom
vic/subprocess_to_build_pipeline
Jan 31, 2026
Merged

feat(sentry_integration): exec app in subprocess#217
victoria-yining-huang merged 15 commits intomainfrom
vic/subprocess_to_build_pipeline

Conversation

@victoria-yining-huang
Copy link
Contributor

@victoria-yining-huang victoria-yining-huang commented Dec 16, 2025

Summary

  • Refactor pipeline loading to use subprocess isolation for improved error handling and monitoring
  • Integrate Sentry SDK for streaming platform observability
  • Add test coverage for subprocess communication and error scenarios

note:

  • this sentry integration support is not added to rust entry point

@victoria-yining-huang victoria-yining-huang changed the title i think i added python sentry demo(sentry_integration): alerts in two different sentry projects Dec 16, 2025
@victoria-yining-huang victoria-yining-huang changed the title demo(sentry_integration): alerts in two different sentry projects wip(sentry_integration): exec app in subprocess Dec 17, 2025
@victoria-yining-huang victoria-yining-huang force-pushed the vic/subprocess_to_build_pipeline branch from d575941 to b0e6a25 Compare January 7, 2026 21:17
@victoria-yining-huang victoria-yining-huang marked this pull request as ready for review January 7, 2026 23:09
@victoria-yining-huang victoria-yining-huang requested a review from a team as a code owner January 7, 2026 23:09
@victoria-yining-huang victoria-yining-huang changed the title wip(sentry_integration): exec app in subprocess feat(sentry_integration): exec app in subprocess Jan 7, 2026
@untitaker
Copy link
Member

also it would be nice to have some sort of test for this.

you can test using custom transports in the SDK

Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach means if no customer sentry configured, pickling errors and pipeline build errors will silently fail. Nobody gets alerted. Should we mandate every customer app must have a Sentry?

No, we build a platform, we would not mandate how the clients manage their errors.
Taking a step back I think we should be notified even when the application code fails when starting up, it is still useful for us.
I think we should get back those errors and send them to our integration as well. We can tag them or lower the severity so we can filter them out, but we should have visibility on them no matter how the customer set up their integrations.

We do not want product to get infra errors in their integration, but I think we should at least have visibility on product errors whether or not the product sets up sentry

assigned_segment_id = int(segment_id) if segment_id else None
pipeline: Pipeline[Any] = pipeline_globals["pipeline"]
runtime: Any = load_adapter(adapter, environment_config, assigned_segment_id, metric_config)
translator = RuntimeTranslator(runtime)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a use case we did not consider and is an issue.
The pipeline definition is coupled with the configuration file. If I rename a step in the pipeline definition and the config file references that, the config may become invalid. Though we would know that only at this step. That error would go to streaming.

Though if we changed that and sent config file mistakes to product, the system would be wrong anyway, as streaming owns the config file content.

Do we have a plan to address this issue ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not address this issue yet. The configuration yaml file should have its own validation step, and it's not been built yet. In any case like you said it will be a problem if this step is the first place an invalid deployment config file is erroring out

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpacifici not sure what the course of action here is though, a mismatch between deployment config and pipeline settings could be either team's responsibility. if a team renames its steps and does not update the config file in advance then we should not have to react to that.

I think we should probably try to prevent this issue in CI if it becomes a big one.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably try to prevent this issue in CI if it becomes a big one.

I think we do not haev a precise story for how to manage this use case in a CI/CD environment.
IF I change the application, the change may be breaking with respect to what the config says. The way this works today would require us to switch the configuraiton at the same time the new sha is deployed, which is not something we can really enforce now.

I think this should be discussed in a followup

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@victoria-yining-huang
Copy link
Contributor Author

@fpacifici added new feature:

  • when subprocess fails, main process sentry will receive a generic CalledProcessError exception. The subprocess' sentry (if configured) will receive specific Python Exceptions.

@victoria-yining-huang
Copy link
Contributor Author

@untitaker added tests using transports

cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 8 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

@github-actions
Copy link

github-actions bot commented Jan 23, 2026

Semver Impact of This PR

🟡 Minor (new features)

📋 Changelog Preview

This is how your changes will appear in the changelog.
Entries from this PR are highlighted with a left border (blockquote style).


New Features ✨

  • (sentry_integration) Exec app in subprocess by victoria-yining-huang in #217

Other

  • Require sink at the end of a branch of the pipeline by fpacifici in #230
  • Move gcssink config override out of the adapter by fpacifici in #228

🤖 This preview updates automatically when you update the PR.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

adapter: str,
config_file: str,
segment_id: Optional[str],
application: str,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Click option name mismatches function parameter name

High Severity

The @click.option("--config", ...) decorator maps the CLI option to a parameter named config, but the run_with_config_file function declares this parameter as config_file. When invoked via CLI, Click passes config=value but the function expects config_file, causing a TypeError. The Rust entry point works because it passes positional arguments, but the Python CLI entry point (if __name__ == "__main__") is completely broken.

Additional Locations (1)

Fix in Cursor Fix in Web

) -> None:
runtime = load_runtime(name, log_level, adapter, config, segment_id, application)
runtime.run()
run_with_config_file(name, log_level, adapter, config, segment_id, application)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling Click command with positional args breaks invocation

High Severity

The main function calls run_with_config_file with positional arguments, but run_with_config_file is a Click Command object (due to @click.command() decorator). Click's Command.__call__ forwards positional args to its internal main(args, prog_name, complete_var, standalone_mode, ...) method, so the arguments are misinterpreted as Click's internal parameters rather than the decorated function's parameters. The Rust code calling via call1 has the same issue.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

name: str,
log_level: str,
adapter: str,
config_file: str,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Click option name doesn't match function parameter

High Severity

The Click option --config (line 171-177) doesn't match the function parameter config_file (line 192). Click derives the parameter name from the option name by stripping --, so it expects a parameter named config. When the CLI is invoked, Click will fail to map the --config argument to the config_file parameter, causing a runtime error.

Additional Locations (1)

Fix in Cursor Fix in Web

# Note: Customer print() and logging statements (redirected to stderr)
# do not trigger platform Sentry alerts.
with multiprocessing.Pool(processes=1) as pool:
pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subprocess pipeline return requires pickling locally-defined functions

Medium Severity

The new subprocess isolation uses multiprocessing.Pool.apply() which requires pickling the Pipeline object to return it to the main process. Functions defined locally in customer application files (via exec()) cannot be properly pickled because Python can't find them by module reference. Existing examples like gcs_sink.py define local functions (e.g., generate_files) used in GCSSink.object_generator that would fail with a PicklingError after this change. The old code exec'd directly in the main process without pickling.

Fix in Cursor Fix in Web

PyResult::Ok(runtime)
})?;
runtime_config.application,
))?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust entry point calls Click command with wrong arguments

High Severity

The run_with_config_file function is decorated with @click.command(), making it a Click Command object rather than a regular function. The Rust code and main() function call it directly with positional arguments, but Click Command objects expect CLI-style invocation. Positional args get interpreted as Click's internal parameters (args, prog_name, standalone_mode), not the decorated function's parameters, causing incorrect behavior or a TypeError.

Additional Locations (1)

Fix in Cursor Fix in Web

@victoria-yining-huang victoria-yining-huang force-pushed the vic/subprocess_to_build_pipeline branch from a642e82 to 6783bc9 Compare January 29, 2026 05:23
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

PyResult::Ok(runtime)
})?;
runtime_config.application,
))?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust entry point calls click-decorated function incorrectly

High Severity

The Rust code calls run_with_config_file directly with positional arguments, but this function is decorated with @click.command(). Click transforms it into a Command object that interprets the call arguments as CLI argument strings to parse. Passing values like ("myname", "INFO", "arroyo", ...) will fail because click expects options in --name VALUE format, not raw positional values. The first value matches the application argument, but subsequent values become unexpected extra arguments causing a parse error.

Additional Locations (1)

Fix in Cursor Fix in Web

# Note: Customer print() and logging statements (redirected to stderr)
# do not trigger platform Sentry alerts.
with multiprocessing.Pool(processes=1) as pool:
pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subprocess cannot return Pipeline containing lambda functions

High Severity

multiprocessing.Pool.apply() pickles the return value to transfer it from the subprocess to the main process. However, Pipeline objects commonly contain lambda functions (e.g., in Map, Filter, Router steps), and Python's standard pickle cannot serialize lambdas. Any real-world pipeline with transform functions will fail with _pickle.PicklingError: Can't pickle <function <lambda>>. The tests only pass because they use a trivial streaming_source with no transforms.

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

known constraint

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

except Exception:
raise
with multiprocessing.Pool(processes=1) as pool:
pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subprocess pickling fails for pipelines with lambda functions

High Severity

The multiprocessing.Pool.apply() call returns the Pipeline object from the subprocess by pickling it. However, Pipeline objects commonly contain steps with function attributes holding lambda functions (as seen throughout test files like test_runner.py and test_chain.py). Lambda functions cannot be pickled with Python's default pickle module, causing a PicklingError at runtime. The test in test_load_runtime.py only uses a minimal pipeline without Map/Filter steps, so this issue isn't caught.

Fix in Cursor Fix in Web

raise
with multiprocessing.Pool(processes=1) as pool:
pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,))
logger.info("Successfully loaded pipeline from subprocess")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline sink validation removed without replacement

Medium Severity

The validate_all_branches_have_sinks(pipeline) call was removed from load_runtime without replacement. This validation ensured all pipeline branches terminate with a Sink step. Without it, pipelines with leaf nodes that aren't sinks are silently accepted, potentially causing data loss since data flows to nodes that discard it. The PR description mentions subprocess isolation and Sentry integration, not intentional removal of validation.

Fix in Cursor Fix in Web

let runtime = py
.import("sentry_streams.runner")?
.getattr("load_runtime")?
py.import("sentry_streams.runner")?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sys.path modifications not inherited by subprocess on macOS/Windows

Medium Severity

When the Rust entry point sets sys.path via module_paths, these modifications aren't inherited by the subprocess on macOS/Windows. Those platforms use "spawn" for multiprocessing (fresh Python interpreter), not "fork" (copy of parent process). Any imports in the application file that depend on the custom paths will fail with ModuleNotFoundError in the subprocess.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 4 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

raise
with multiprocessing.Pool(processes=1) as pool:
pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,))
logger.info("Successfully loaded pipeline from subprocess")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline pickling fails with lambda functions

High Severity

The multiprocessing.Pool.apply() call attempts to pickle the Pipeline object returned from _load_pipeline() to transfer it back to the main process. Pipelines commonly contain lambda functions in Map, Filter, Reducer, and other steps, but lambda functions cannot be pickled by Python's default pickle module. This causes a PicklingError at runtime when loading any pipeline containing lambdas, which is the standard pattern throughout the codebase (as seen in examples and tests).

Fix in Cursor Fix in Web

raise ValueError("Application file must define a 'pipeline' variable")

pipeline = cast(Pipeline[Any], pipeline_globals["pipeline"])
return pipeline
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Platform Sentry not initialized in subprocess

Medium Severity

The platform's Sentry SDK is initialized with streaming_platform_config DSN in run_with_config_file (main process) but not in the _load_pipeline subprocess where customer code executes. Exceptions occurring during pipeline loading in the subprocess won't be captured by the platform's Sentry, only propagating as CalledProcessError to the main process. This contradicts the config schema which provides platform-level DSN configuration, suggesting platform Sentry should monitor both processes.

Additional Locations (1)

Fix in Cursor Fix in Web

metric_config = {}

assigned_segment_id = int(segment_id) if segment_id else None
pipeline: Pipeline[Any] = pipeline_globals["pipeline"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing KeyError handling for datadog metrics config

Medium Severity

When metric_config.get("type") == "datadog", the code directly accesses metric_config["host"] and metric_config["port"] without validation. If the config is malformed (missing these required keys), a KeyError will be raised. While run_with_config_file validates config via JSON schema, load_runtime is a public function that can be called directly without validation, as seen in tests, making this crash possible when datadog metrics are configured incorrectly.

Additional Locations (1)

Fix in Cursor Fix in Web

from sentry_streams.pipeline import streaming_source
pipeline = streaming_source(name="test", stream_name="test-stream")
"""
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test pipeline missing required sink

Medium Severity

The test creates a pipeline containing only a streaming_source without any sink, but expects load_runtime to succeed. However, load_runtime calls validate_all_branches_have_sinks at line 106 of runner.py, which will detect that the source is a leaf node but not a Sink instance, causing an InvalidPipelineError to be raised. The test will fail when executed because it doesn't handle this validation error.

Fix in Cursor Fix in Web

remove main

renamed some functions

up to date

set subprocess path
@victoria-yining-huang victoria-yining-huang force-pushed the vic/subprocess_to_build_pipeline branch from b9d65c8 to 4ee905c Compare January 29, 2026 22:48
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

except Exception:
raise
with multiprocessing.Pool(processes=1) as pool:
pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline pickling fails for lambdas in multiprocessing

High Severity

The multiprocessing.Pool.apply call returns the Pipeline object by pickling it back to the main process. However, Pipeline objects often contain lambda functions or closures in steps like Map, Filter, or Router (e.g., Map(name="x", function=lambda msg: ...)). Python's standard pickle cannot serialize lambda functions, so any pipeline using lambdas will raise a PicklingError. The tests pass because they only verify simple source-to-sink pipelines without transformation steps.

Fix in Cursor Fix in Web

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

runtime_config.adapter,
runtime_config.config,
runtime_config.segment_id,
runtime_config.application_name,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust calls .run() on None return value

High Severity

The Rust entry point calls run_with_config_file and then attempts to call .run() on its return value. However, run_with_config_file already calls runtime.run() internally and returns None (its return type is explicitly -> None). This means the Rust code will crash with a PyO3 error when attempting to call .run() on Python's None object after the pipeline completes.

Additional Locations (1)

Fix in Cursor Fix in Web

runtime_config.adapter,
runtime_config.config,
runtime_config.segment_id,
runtime_config.application_name,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust calls .run() on None return value

High Severity

The Rust entry point calls run_with_config_file and then attempts to call .run() on its return value. However, run_with_config_file already calls runtime.run() internally and returns None (its return type is explicitly -> None). This means the Rust code will crash with a PyO3 error when attempting to call .run() on Python's None object after the pipeline completes.

Additional Locations (1)

Fix in Cursor Fix in Web

raise
with multiprocessing.Pool(processes=1) as pool:
pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,))
logger.info("Successfully loaded pipeline from subprocess")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline pickling fails for lambda functions and inline callables

High Severity

The multiprocessing.Pool.apply call requires the returned Pipeline object to be pickled across process boundaries. Pipeline objects can contain Callable fields in steps like Map, Filter, FlatMap, Router, and Reducer. Lambda functions and functions defined inline in the user's application file are not picklable, causing a PicklingError. This is a regression from the previous in-process exec() approach where pickling was not required.

Fix in Cursor Fix in Web

Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address all comments before merging and ensure the rust CLI works.

let runtime = py
.import("sentry_streams.runner")?
.getattr("load_runtime")?
.getattr("run_with_config_file")?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem the right method.
Please ensure this CLI works before merging.

Comment on lines 81 to 85
app_file.write_text(
"""
from sentry_streams.pipeline import streaming_source
pipeline = streaming_source(name="test", stream_name="test-stream")
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the benefits of having type checking are worth it here

The benefit of having code in a proper python file is that the interpreter, ci, typechecking and lint will tell you if something is wrong if a single character changes by mistake in that string that contains the code. Please do not leave it as it is.

On the other hand, I do not see any gain in keeping python code in a string.

)

app_file = temp_fixture_dir / "simple_app.py"
app_file.write_text(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not leave this module in a string. Have a proper python file.

Comment on lines 38 to 61

@pytest.fixture(autouse=True)
def reset_metrics_backend() -> Generator[None, None, None]:
"""Reset the global metrics backend between tests."""
from sentry_streams import metrics

try:
from arroyo.utils import metrics as arroyo_metrics

has_arroyo = True
except ImportError:
has_arroyo = False

# Reset before each test
metrics.metrics._metrics_backend = None
if has_arroyo:
arroyo_metrics._metrics_backend = None

yield

# Reset to None after each test
metrics.metrics._metrics_backend = None
if has_arroyo:
arroyo_metrics._metrics_backend = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resurfacing my comment from before.
None of these are used in any assertion in the tests:

  • CaptureTransport
  • reset_metrics_backend

Please remove

Comment on lines 74 to 102
) -> None:
sentry_sdk.init(
dsn="https://platform@example.com/456",
transport=platform_transport,
)

app_file = temp_fixture_dir / "simple_app.py"
app_file.write_text(
"""
from sentry_streams.pipeline import streaming_source, StreamSink
pipeline = streaming_source(name="test", stream_name="test-stream").sink(StreamSink("test-sink", stream_name="test-output"))
"""
)

with (
patch("sentry_streams.runner.load_adapter") as mock_load_adapter,
patch("sentry_streams.runner.iterate_edges") as mock_iterate_edges,
):
mock_runtime = type(
"MockRuntime",
(),
{
"run": lambda self: None,
"source": lambda self, step: "mock_stream",
"complex_step_override": lambda self: {},
},
)()
mock_load_adapter.return_value = mock_runtime

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resurfacing the comments from the previous review:

  • Please use the dummyAdapter rather than mocking everything
  • Please do not inline python code in a string
  • The custom transport is not being used anywhere in assertions. Please remove it
  • Same for the metrics. Please remove resety_metrics_backend

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

runtime_config.adapter,
runtime_config.config,
runtime_config.segment_id,
runtime_config.application_name,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust entry point calls .run() on None returned value

High Severity

The Rust entry point in run.rs calls run_with_config_file and then attempts to invoke .run() on the returned value. However, run_with_config_file returns None (as indicated by its -> None type annotation) and already calls runtime.run() internally. This causes the Rust code to call .run() on None, which will crash with an AttributeError at runtime, completely breaking the Rust entry point.

Additional Locations (1)

Fix in Cursor Fix in Web

pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,))
logger.info("Successfully loaded pipeline from subprocess")
except Exception:
raise
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pointless try-except block immediately re-raises exception

Low Severity

The try-except block around the multiprocessing pool catches Exception and immediately re-raises it without any additional handling, logging, or cleanup. This is redundant code that should be removed.

Fix in Cursor Fix in Web

try:
jsonschema.validate(environment_config, schema)
except Exception:
raise
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pointless try-except block immediately re-raises exception

Low Severity

The try-except block around jsonschema.validate catches Exception and immediately re-raises it without any additional handling. This is redundant code that should be removed.

Fix in Cursor Fix in Web

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 4 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

runtime_config.adapter,
runtime_config.config,
runtime_config.segment_id,
runtime_config.application_name,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust entry point calls .run() on None return value

High Severity

The Rust entry point calls run_with_config_file which returns None (since it has -> None return type and already calls runtime.run() internally), but then the Rust code stores this result and tries to call .run() on it. This will crash with an AttributeError when the Rust binary is used, as NoneType has no run method.

Additional Locations (1)

Fix in Cursor Fix in Web

try:
with multiprocessing.Pool(processes=1) as pool:
pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,))
logger.info("Successfully loaded pipeline from subprocess")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subprocess pipeline loading breaks lambdas in pipeline definitions

High Severity

The new subprocess approach using multiprocessing.Pool.apply() requires the Pipeline object to be pickled when returning from the subprocess. However, lambdas are not picklable with standard pickle, and the existing billing.py example uses aggregate_func=lambda: OutcomesBuffer(). This causes a PicklingError at runtime, breaking previously valid pipeline definitions that use lambdas for functions like aggregate_func, routing_function, or function in Map/Filter steps.

Fix in Cursor Fix in Web

pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,))
logger.info("Successfully loaded pipeline from subprocess")
except Exception:
raise
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant try/except block that only re-raises

Low Severity

The try/except Exception: raise block wrapping the multiprocessing Pool code is redundant. It catches the exception and immediately re-raises it without any modification, logging, or cleanup. The code would behave identically without the try/except wrapper.

Fix in Cursor Fix in Web

try:
jsonschema.validate(environment_config, schema)
except Exception:
raise
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant try/except block that only re-raises

Low Severity

The try/except Exception: raise block wrapping jsonschema.validate is redundant. It catches the exception and immediately re-raises it without any transformation or additional handling. The call to jsonschema.validate(environment_config, schema) can stand alone.

Fix in Cursor Fix in Web

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

try:
with multiprocessing.Pool(processes=1) as pool:
pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,))
logger.info("Successfully loaded pipeline from subprocess")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline objects with lambdas fail subprocess pickle

Medium Severity

The multiprocessing.Pool.apply() call requires the returned Pipeline object to be pickled for transfer from the subprocess to the parent process. Pipeline objects can contain lambda functions in step attributes like Map.function, Filter.function, Router.routing_function, and GCSSink.object_generator. Since lambdas and closures cannot be pickled with Python's standard pickle module, any pipeline using lambda functions will fail with a cryptic pickle error during loading. This is a regression from the previous implementation where exec() ran in the same process, avoiding pickle serialization entirely.

Fix in Cursor Fix in Web

pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,))
logger.info("Successfully loaded pipeline from subprocess")
except Exception:
raise
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant try-except block that only re-raises

Low Severity

The try: ... except Exception: raise block around the multiprocessing pool is redundant. Catching an exception only to immediately re-raise it without any modification, logging, or cleanup adds no functionality. The multiprocessing code can be simplified by removing the try-except wrapper entirely.

Fix in Cursor Fix in Web

try:
jsonschema.validate(environment_config, schema)
except Exception:
raise
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant try-except block that only re-raises

Low Severity

The try: ... except Exception: raise block around jsonschema.validate() is redundant. Catching an exception only to immediately re-raise it provides no value. The jsonschema.validate() call can stand alone without this wrapper.

Fix in Cursor Fix in Web

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

datefmt="%Y-%m-%d %H:%M:%S",
)
with multiprocessing.Pool(processes=1) as pool:
pipeline: Pipeline[Any] = pool.apply(_load_pipeline, (application,))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline with lambdas fails subprocess pickling

High Severity

Using multiprocessing.Pool.apply() to load the pipeline in a subprocess requires pickling the Pipeline object to return it to the main process. Python's standard pickle cannot serialize lambda functions. The existing billing.py example uses aggregate_func=lambda: OutcomesBuffer(), and any user pipeline with lambdas in Map, Filter, Reducer, or Router steps will fail with a PicklingError. This is a regression from the previous in-process execution model.

Fix in Cursor Fix in Web

@victoria-yining-huang victoria-yining-huang merged commit 60600e5 into main Jan 31, 2026
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants