Skip to content

feat(k8s): Add multi-processing support to PipelineStep macro#232

Merged
fpacifici merged 3 commits intomainfrom
fpacifici/parallelism_support
Feb 11, 2026
Merged

feat(k8s): Add multi-processing support to PipelineStep macro#232
fpacifici merged 3 commits intomainfrom
fpacifici/parallelism_support

Conversation

@fpacifici
Copy link
Collaborator

@fpacifici fpacifici commented Jan 31, 2026

Summary

Adds automatic multi-processing support to the PipelineStep Kubernetes macro. When a pipeline configuration specifies parallelism.multi_process.processes, the macro now:

  • Automatically multiplies CPU and memory resources by the process count
  • Mounts /dev/shm shared memory volume required for Python multiprocessing IPC
  • Validates that only one segment specifies parallelism configuration

Implementation Details

Updated: build_container()

  • Added process_count parameter
  • Multiplies CPU/memory by process count: 1000m CPU × 4 processes = 4000m CPU
  • Adds /dev/shm volume mount when process_count > 1

Updated: PipelineStep.run()

  • Detects multiprocessing configuration
  • Validates only one segment has parallelism
  • Adds dshm emptyDir volume with medium: "Memory" to deployment

Fixes

Fixes STREAM-707

Automatically detect and configure multiprocessing based on pipeline configuration.
When parallelism.multi_process.processes is specified:
- Multiply CPU and memory resources by process count
- Mount /dev/shm shared memory volume for IPC
- Validate that only one segment specifies parallelism

Fixes STREAM-707
@fpacifici fpacifici requested a review from a team as a code owner January 31, 2026 01:09
@linear
Copy link

linear bot commented Jan 31, 2026

@github-actions
Copy link

github-actions bot commented Jan 31, 2026

Semver Impact of This PR

🟡 Minor (new features)

📋 Changelog Preview

This is how your changes will appear in the changelog.
Entries from this PR are highlighted with a left border (blockquote style).


New Features ✨

  • (k8s) Add multi-processing support to PipelineStep macro by fpacifici in #232
  • (sentry_integration) Exec app in subprocess by victoria-yining-huang in #217

Other

  • Require sink at the end of a branch of the pipeline by fpacifici in #230
  • Move gcssink config override out of the adapter by fpacifici in #228

🤖 This preview updates automatically when you update the PR.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

cpu_per_process,
memory_per_process,
segment_id,
process_count,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiprocessing resources applied to wrong segments

High Severity

The process_count from get_multiprocess_config() is applied unconditionally to the current segment_id being deployed, without checking if that segment actually has parallelism configured. If segment 0 has no multiprocessing but segment 1 has processes: 4, deploying segment 0 would incorrectly receive 4x resources and the /dev/shm volume. The code retrieves segments_with_parallelism but never checks whether segment_id is in that list before applying resource scaling.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test to cover this case? I don't think this is a valid bug but what is expected to happen where multiple segments are passed in and the first one is not the one that requires parallelism?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was about to add the test but then I figured out that the comment is wrong.

This is an example of the pipeline with parallelism

pipeline:
  segments:
    - steps_config:
        myinput:
          starts_segment: True
          bootstrap_servers: ["127.0.0.1:9092"]
          parallelism: 1
        parser:
          # Parser is the beginning of the segment.
          # All Map steps in the same segment are chained
          # together in the same process.
          #
          # When adding a step to the segment that is not
          # a map we need to create a new segment as these
          # cannot be ran in a multi process step.
          starts_segment: True
          parallelism:
            multi_process:
              processes: 4
              batch_size: 1000
              batch_time: 0.2
        mysink:
          starts_segment: True
          bootstrap_servers: ["127.0.0.1:9092"]

WE pass --segment-id = 0 to run it but technically speaking, the parallel segment is the second.
This is because segments do not have a sound semantics. We have segments in the segments list but we can also start segments implicitly inside a single step.

We need to figure this out, though we cannot assert that the segment with the parallelism config would be the one passed to the consumer as this is never the case.

if not multi_process:
continue

processes = multi_process.get("processes")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing type check causes crash on non-dict multi_process

Medium Severity

The code checks isinstance(parallelism, dict) before calling .get() on it, but there's no corresponding check for multi_process. If multi_process is a truthy non-dict value (e.g., multi_process: true or multi_process: 1 in YAML), the condition if not multi_process passes, and then multi_process.get("processes") crashes with AttributeError: 'bool' object has no attribute 'get'.

Fix in Cursor Fix in Web

fpacifici and others added 2 commits February 10, 2026 17:13
Co-authored-by: Cursor <cursoragent@cursor.com>
@fpacifici fpacifici force-pushed the fpacifici/parallelism_support branch from 84dc0d6 to 9fea014 Compare February 11, 2026 01:24
@fpacifici fpacifici merged commit d9d7a58 into main Feb 11, 2026
21 checks passed
@fpacifici fpacifici deleted the fpacifici/parallelism_support branch February 11, 2026 16:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants