Skip to content

Add replicas parameter to PipelineStep macro#231

Merged
fpacifici merged 6 commits intomainfrom
fpacifici/parallelism
Feb 11, 2026
Merged

Add replicas parameter to PipelineStep macro#231
fpacifici merged 6 commits intomainfrom
fpacifici/parallelism

Conversation

@fpacifici
Copy link
Collaborator

@fpacifici fpacifici commented Jan 30, 2026

Summary

This PR adds a replicas parameter to the PipelineStep macro in sentry_streams_k8s to support parallelism in Kafka streaming applications.

Ideally we should support something like this https://getsentry.github.io/streams/configure_pipeline.html. But that requires some design discussion about the semantics of the replicas field.
In the meantime we just need a way to set the replicas count on the k8s deployment. Adding the replicas count in the macro rather than in the config makes it easy to move it later.

Usage Example

{{ render_external(
    "sentry_streams_k8s.pipeline_step.PipelineStep",
    {
        "service_name": "my-service",
        "pipeline_name": "profiles",
        "replicas": 3,  # NEW: Set number of replicas
        # ... other parameters
    }
) }}

Adds a new `replicas` parameter to the PipelineStep macro to control the
number of replicas in generated Kubernetes deployments. The parameter
defaults to 1 to maintain backward compatibility with existing usage.

- Add replicas field to PipelineStepContext TypedDict
- Update parse_context to handle replicas with default value of 1
- Set replicas in deployment spec during manifest generation
- Add comprehensive test coverage for default and custom values
- Update docstring example to demonstrate usage

All tests pass and type checking succeeds.
@fpacifici fpacifici requested a review from a team as a code owner January 30, 2026 23:43
@github-actions
Copy link

github-actions bot commented Jan 30, 2026

Semver Impact of This PR

None (no version bump detected)

📋 Changelog Preview

This is how your changes will appear in the changelog.
Entries from this PR are highlighted with a left border (blockquote style).


New Features ✨

  • (sentry_integration) Exec app in subprocess by victoria-yining-huang in #217

Other

  • Add replicas parameter to PipelineStep macro by fpacifici in #231
  • Require sink at the end of a branch of the pipeline by fpacifici in #230
  • Move gcssink config override out of the adapter by fpacifici in #228

🤖 This preview updates automatically when you update the PR.

fpacifici and others added 2 commits February 2, 2026 14:15
Refactored conflict detection to use fail_on_scalar_overwrite parameter
in deepmerge instead of separate validation logic. This approach:

- Automatically detects all scalar field conflicts without maintaining explicit lists
- Provides accurate path tracking for better error messages
- Detects conflicts with both user template and base template values
- Allows dict merging and list appending as expected
- Only fails when scalar values actually differ

Changes:
- Add fail_on_scalar_overwrite parameter to deepmerge function
- Add ScalarOverwriteError exception with path tracking
- Remove TemplateConflictError and detect_scalar_overwrites helper
- Add 7 comprehensive tests for new behavior in test_merge.py
- Consolidate to single conflict test in test_pipeline_step.py

All tests pass (41 total).

Co-authored-by: Cursor <cursoragent@cursor.com>
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

f"Cannot merge key '{key}': base type is {type(base_value)} but override type is {type(override_value)}"
)
else:
# Scalar to scalar replacement
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypeMismatchError lacks full path in error message

Low Severity

The _path tracking and path_str computation were added in this PR for better error messages, but TypeMismatchError still uses only the immediate key while ScalarOverwriteError uses the full path_str. This results in inconsistent error message quality—a deeply nested type mismatch shows only "Cannot merge key 'containers'" while a scalar overwrite shows "Cannot overwrite scalar at 'spec.template.spec.containers'". The path_str is also computed on every iteration but unused for most cases.

Fix in Cursor Fix in Web

@fpacifici fpacifici merged commit baa1bcf into main Feb 11, 2026
21 checks passed
@fpacifici fpacifici deleted the fpacifici/parallelism branch February 11, 2026 01:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants