Making URL Generator's / FilePartioningStage Xenna spec to be one worker per node#1350
Making URL Generator's / FilePartioningStage Xenna spec to be one worker per node#1350
Conversation
…orStage Signed-off-by: Abhinav Garg <abhinavg@stanford.edu>
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This PR adds xenna_stage_spec() methods to FilePartitioningStage and URLGenerationStage to configure them to use one worker per node when executed with the Xenna backend.
Key Changes:
- Both stages now return
{"num_workers_per_node": 1}from theirxenna_stage_spec()methods - This optimization is justified because both stages start from
_EmptyTask(a single empty task) and fan out to multiple tasks via theirprocess()methods - Having multiple workers per node would be inefficient since there's only one initial task to process per stage
Rationale:
These stages perform fanout operations (converting one _EmptyTask into many FileGroupTask objects). With multiple workers per node, all workers would compete for the single input task, leading to resource waste. One worker per node is optimal for this pattern.
Confidence Score: 5/5
- This PR is safe to merge with minimal risk
- The changes are straightforward performance optimizations that follow established patterns in the codebase. Both methods are correctly implemented, matching the base class interface and similar implementations in other stages like
DocumentDownloadStage. The rationale is sound: stages that start from_EmptyTaskand fan out don't benefit from multiple workers per node. - No files require special attention
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| nemo_curator/stages/file_partitioning.py | 5/5 | added xenna_stage_spec() method to configure one worker per node for Xenna executor |
| nemo_curator/stages/text/download/base/url_generation.py | 5/5 | added xenna_stage_spec() method to configure one worker per node for Xenna executor |
Sequence Diagram
sequenceDiagram
participant XE as XennaExecutor
participant FPS as FilePartitioningStage
participant UGS as URLGenerationStage
participant XW as Xenna Worker
Note over XE,XW: Before PR: Multiple workers per node (default)
XE->>FPS: execute(_EmptyTask)
Note over FPS: xenna_stage_spec() not defined<br/>Default: multiple workers
FPS->>XW: distribute _EmptyTask
Note over XW: Multiple workers compete<br/>for single task (inefficient)
XW->>FPS: process(_EmptyTask)
FPS->>FPS: Fan out to N FileGroupTasks
Note over XE,XW: After PR: One worker per node
XE->>FPS: execute(_EmptyTask)
Note over FPS: xenna_stage_spec() returns<br/>num_workers_per_node=1
FPS->>XW: assign to single worker
Note over XW: One worker processes<br/>single task (optimal)
XW->>FPS: process(_EmptyTask)
FPS->>FPS: Fan out to N FileGroupTasks
Note over UGS: URLGenerationStage follows<br/>same pattern
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This PR optimizes resource allocation for two fanout stages (FilePartitioningStage and URLGenerationStage) by configuring them to use exactly one worker per node in the Xenna executor.
Context: Both stages are "fanout" stages that start from _EmptyTask (a task with no actual data) and generate multiple output tasks. FilePartitioningStage scans file paths and creates FileGroupTask objects for parallel processing, while URLGenerationStage generates URLs and creates tasks for each URL. Since these stages perform lightweight orchestration rather than heavy data processing, they don't benefit from multiple workers per node.
Changes Made:
- Added
xenna_stage_spec()method toFilePartitioningStagethat returns{"num_workers_per_node": 1} - Added
xenna_stage_spec()method toURLGenerationStagethat returns{"num_workers_per_node": 1}
How it works: The Xenna executor calls stage.xenna_stage_spec() to get stage-specific configuration and passes the num_workers_per_node value to the underlying StageSpec (see nemo_curator/backends/xenna/executor.py, line 91). This configuration controls how many workers are allocated per node for that specific stage.
Consistency: This change follows the existing pattern used in other stages like DocumentDownloadStage (which uses the downloader's num_workers_per_node() method) and is architecturally similar to ImageDuplicatesRemovalStage (which conditionally sets this value).
Confidence Score: 5/5
- This PR is safe to merge with minimal risk
- The changes are minimal, well-scoped, and follow existing patterns in the codebase. Both modifications add the same simple method that returns a static configuration dictionary. The implementation matches the pattern used in DocumentDownloadStage and is architecturally sound. Since these are fanout stages starting from EmptyTask, limiting to one worker per node is the correct optimization. No logic changes, no edge cases introduced, and the code is straightforward.
- No files require special attention
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| nemo_curator/stages/file_partitioning.py | 5/5 | Added xenna_stage_spec() method returning {"num_workers_per_node": 1} to optimize resource allocation for fanout stage starting from EmptyTask |
| nemo_curator/stages/text/download/base/url_generation.py | 5/5 | Added xenna_stage_spec() method returning {"num_workers_per_node": 1} to optimize resource allocation for URL generation fanout stage |
Sequence Diagram
sequenceDiagram
participant Executor as XennaExecutor
participant FPS as FilePartitioningStage
participant UGS as URLGenerationStage
participant Xenna as Xenna Pipeline
Note over Executor: execute() called with stages
Executor->>FPS: xenna_stage_spec()
FPS-->>Executor: {"num_workers_per_node": 1}
Executor->>Xenna: Create StageSpec with num_workers_per_node=1
Note over Xenna: Allocates 1 worker per node for FPS
Executor->>FPS: process(_EmptyTask)
Note over FPS: Scans file paths<br/>Creates FileGroupTasks
FPS-->>Xenna: [FileGroupTask_0, FileGroupTask_1, ...]
Note over Xenna: Fanout: Multiple tasks generated<br/>from single EmptyTask
Executor->>UGS: xenna_stage_spec()
UGS-->>Executor: {"num_workers_per_node": 1}
Executor->>Xenna: Create StageSpec with num_workers_per_node=1
Note over Xenna: Allocates 1 worker per node for UGS
Executor->>UGS: process(_EmptyTask)
Note over UGS: Generates URLs<br/>Creates FileGroupTasks
UGS-->>Xenna: [FileGroupTask_0, FileGroupTask_1, ...]
Note over Xenna: Fanout: One task per URL
Description
Making URL Generator's / FilePartioningStage Xenna spec to be one worker per node, since these stages start from EmptyTask, we do not need multiple workers per stage for these
Usage
Checklist