Add workflow results#1275
Conversation
…dd-workflow-results Signed-off-by: Praateek <praateekm@gmail.com>
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
|
/ok to test af0787c |
| task._metadata.get("num_removal_ids", 0) for task in connected_components_tasks | ||
| task._metadata.get("num_removal_ids", 0) for task in (connected_components_tasks or []) | ||
| ) | ||
| workflow_result.add_metadata("num_duplicates", num_removed_documents) |
There was a problem hiding this comment.
Nit: call it num_duplicates to match the metadata name and the field name used in the exact deduplication workflow.
| workflow_result.add_metadata("num_duplicates", num_removed_documents) | |
| workflow_result.add_metadata("num_duplicates", num_duplicates) |
There was a problem hiding this comment.
Renamed variable to num_duplicates_identified while the metadata is num_duplicates only. For removal the metadata key will be num_duplicates_removed
| output_tasks: list[FileGroupTask] | None = None | ||
| execution_time = 0.0 | ||
| num_removed_duplicates = 0 |
There was a problem hiding this comment.
Same here in general, let's see if we can avoid unnecessary initializations in general for all of the exact, fuzzy, semantic, and removal workflows?
There was a problem hiding this comment.
Here they might not be unnecessary as both execution_time and num_duplicates_removed happen inside a try block which might fail otherwise
Signed-off-by: Praateek <praateekm@gmail.com>
There was a problem hiding this comment.
Additional Comments (2)
-
nemo_curator/stages/deduplication/fuzzy/workflow.py, line 373 (link)logic:
id_generator_pathundefined when no valid LSH tasks foundWhen
valid_lsh_tasksis empty (line 332), the code skips theelseblock whereid_generator_pathis assigned (line 355), but still references it here causingUnboundLocalError -
nemo_curator/stages/deduplication/exact/workflow.py, line 257 (link)logic:
id_generator_pathundefined whenassign_id=FalseWhen
self.assign_idisFalse,id_generator_pathis never assigned (line 239 is insideif self.assign_id:), causingUnboundLocalError
13 files reviewed, 2 comments
There was a problem hiding this comment.
Additional Comments (6)
-
nemo_curator/stages/deduplication/exact/workflow.py, line 257 (link)logic:
id_generator_pathundefined whenself.assign_idisFalse -
nemo_curator/stages/deduplication/fuzzy/workflow.py, line 373 (link)logic:
id_generator_pathundefined when no duplicates found (len(valid_lsh_tasks) == 0) -
tests/stages/text/deduplication/test_removal_workflow.py, line 192 (link)logic: Metadata key mismatch - removal workflow sets
num_duplicates_removed, notnum_duplicates -
tests/stages/text/deduplication/test_removal_workflow.py, line 193 (link)logic:
num_output_tasksnever set in removal workflow metadata -
tests/stages/text/deduplication/test_removal_workflow.py, line 210 (link)logic: Metadata key mismatch - removal workflow sets
num_duplicates_removed, notnum_duplicates -
tests/stages/text/deduplication/test_removal_workflow.py, line 282-283 (link)logic: Metadata key mismatches - removal workflow sets
num_duplicates_removed, notnum_duplicates, and never setsnum_output_tasks
13 files reviewed, 6 comments
…dd-workflow-results Signed-off-by: Praateek <praateekm@gmail.com>
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
Summary
This PR refactors the workflow interfaces for deduplication pipelines to standardize outputs using a new WorkflowRunResult dataclass. The core refactoring is sound - introducing WorkflowRunResult and WorkflowBase provides good structure for workflow execution results.
Critical Issues Found
1. Exact Deduplication Workflow - Unbound Variable Error
The id_generator_path variable is referenced in workflow_summary (line 257) but is only defined inside the if self.assign_id: block (line 239). When assign_id=False, accessing id_generator_path will raise an UnboundLocalError.
2. Fuzzy Deduplication Workflow - Unbound Variable Error
Similar issue: id_generator_path is used in metadata at line 373 but only defined inside the else block (line 355) that executes when duplicates are found. When no duplicates are found, the variable is undefined, causing UnboundLocalError.
3. Text Removal Workflow - Metadata Key Inconsistency
The workflow sets metadata key num_duplicates_removed (line 212), but tests expect num_duplicates (3 occurrences at lines 192, 210, 282 in test_removal_workflow.py). This will cause test failures.
4. Text Semantic Deduplication - Metadata Key Mismatch
At line 519 in semantic.py, the code retrieves num_duplicates from removal_metadata, but the TextDuplicatesRemovalWorkflow actually sets num_duplicates_removed. This results in num_duplicates_removed being None instead of the actual count.
5. Text Removal Workflow - Missing Metadata
The workflow is missing the num_output_tasks metadata that tests expect (lines 193 and 283 in test_removal_workflow.py).
Files with Issues
- Critical:
nemo_curator/stages/deduplication/exact/workflow.py(unbound variable) - Critical:
nemo_curator/stages/deduplication/fuzzy/workflow.py(unbound variable) - Critical:
tests/stages/text/deduplication/test_removal_workflow.py(metadata key mismatches) - Critical:
nemo_curator/stages/text/deduplication/semantic.py(metadata key mismatch) - Important:
nemo_curator/stages/text/deduplication/removal_workflow.py(missing metadata)
Positive Aspects
WorkflowRunResultdataclass is well-designed with proper factory methodsTaskPerfUtilscorrectly handlesWorkflowRunResultalongside other input types- Exact and Fuzzy deduplication test files properly validate the new return type
- Semantic deduplication workflow correctly integrates with the new structure
- Good metadata aggregation pattern across workflows
Confidence Score: 1/5
- This PR has critical runtime errors that will cause test failures and crashes. Multiple unbound variable errors and metadata key mismatches need to be fixed before merging.
- The PR introduces 5 critical issues: (1-2) UnboundLocalError in exact and fuzzy workflows when variables are accessed outside their definition scope, (3-4) Metadata key mismatches between workflow implementations and test expectations, and (5) Missing metadata fields. These issues will cause immediate test failures and runtime crashes. The core refactoring concept is sound, but implementation has fundamental bugs that prevent safe merge.
- Pay close attention to: exact/workflow.py (unbound id_generator_path), fuzzy/workflow.py (unbound id_generator_path), removal_workflow.py (metadata inconsistencies), and semantic.py (metadata key retrieval bug).
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| nemo_curator/stages/deduplication/exact/workflow.py | 2/5 | Critical issue: id_generator_path variable is used in workflow_summary at line 257 but only defined inside if self.assign_id block at line 239. When assign_id=False, UnboundLocalError will occur. |
| nemo_curator/stages/deduplication/fuzzy/workflow.py | 2/5 | Critical issue: id_generator_path variable is used in metadata at line 373 but only defined inside else block at line 355, which only executes when duplicates are found. When no duplicates found, UnboundLocalError will occur. |
| nemo_curator/stages/text/deduplication/removal_workflow.py | 3/5 | Two issues: 1) Missing num_output_tasks metadata that tests expect. 2) Setting num_duplicates_removed but downstream code and tests expect num_duplicates key in some contexts. |
| nemo_curator/stages/text/deduplication/semantic.py | 3/5 | Critical issue at line 519: tries to get num_duplicates from removal_metadata but removal workflow sets num_duplicates_removed. This causes num_duplicates_removed to be None instead of actual count. |
| tests/stages/text/deduplication/test_removal_workflow.py | 1/5 | Critical issues: Tests expect num_duplicates but workflow sets num_duplicates_removed (3 locations). Tests also expect num_output_tasks but workflow doesn't set it. |
Sequence Diagram
sequenceDiagram
participant User
participant Workflow as Exact/Fuzzy Workflow
participant Pipeline as Pipeline Execution
participant Result as WorkflowRunResult
User->>Workflow: run(initial_tasks)
alt assign_id = True (Exact) or always (Fuzzy)
Workflow->>Workflow: create_id_generator_actor()
end
Workflow->>Pipeline: Run identification pipeline
Pipeline-->>Workflow: removal_id_tasks
alt assign_id = True OR duplicates found
Workflow->>Workflow: id_generator_path = construct_path()
Workflow->>Workflow: write_id_generator_to_disk(id_generator_path)
else Bug: Variable undefined!
Workflow->>Workflow: ❌ UnboundLocalError on id_generator_path
end
Workflow->>Result: add_metadata("id_generator_path", id_generator_path)
Workflow-->>User: return workflow_result
| workflow_summary = { | ||
| "total_time": total_time, | ||
| "num_duplicates": num_duplicates_identified, | ||
| # paths | ||
| "id_generator_path": id_generator_path, | ||
| } | ||
| workflow_result.extend_metadata(workflow_summary) |
There was a problem hiding this comment.
The variable id_generator_path is referenced in the workflow_summary at line 257, but it's only defined inside the if self.assign_id: block at line 239. If self.assign_id is False, this will raise an UnboundLocalError when the workflow_summary dictionary tries to access the undefined variable.
To fix this, initialize id_generator_path before the try block:
| workflow_summary = { | |
| "total_time": total_time, | |
| "num_duplicates": num_duplicates_identified, | |
| # paths | |
| "id_generator_path": id_generator_path, | |
| } | |
| workflow_result.extend_metadata(workflow_summary) | |
| def run( # noqa: PLR0915 | |
| self, initial_tasks: list[FileGroupTask] | None = None, executor: RayActorPoolExecutor | None = None | |
| ) -> WorkflowRunResult: | |
| """Run the deduplication pipeline. | |
| ... | |
| """ | |
| self._validate_initial_tasks(initial_tasks) | |
| workflow_result = WorkflowRunResult(workflow_name="exact_deduplication") | |
| input_filegroups_time = 0.0 | |
| identification_time = 0.0 | |
| id_generator_path = None |
Then update the workflow_summary to conditionally include it only when it's not None.
| workflow_result.add_metadata("total_time", total_time) | ||
| workflow_result.extend_metadata( | ||
| { | ||
| "total_time": total_time, | ||
| # paths | ||
| "id_generator_path": id_generator_path, | ||
| } | ||
| ) |
There was a problem hiding this comment.
The variable id_generator_path is referenced in the metadata at lines 373, but it's only defined inside the else block (line 355) that executes only when duplicates are found. If no duplicates are found (len(valid_lsh_tasks) == 0), id_generator_path will be undefined, causing an UnboundLocalError.
To fix this, initialize id_generator_path before the try block:
| workflow_result.add_metadata("total_time", total_time) | |
| workflow_result.extend_metadata( | |
| { | |
| "total_time": total_time, | |
| # paths | |
| "id_generator_path": id_generator_path, | |
| } | |
| ) | |
| total_start_time = time.time() | |
| try: | |
| create_id_generator_actor() | |
| except ValueError: | |
| err_msg = """ | |
| An existing id generator actor was found. Please remove or save the existing id generator with | |
| `nemo_curator.stages.deduplication.id_generator.write_id_generator_to_disk` (if needed) and remove the actor with | |
| `nemo_curator.stages.deduplication.id_generator.kill_id_generator_actor` before running the fuzzy deduplication pipeline. | |
| """ | |
| raise RuntimeError(err_msg) from None | |
| id_generator_path = None |
Then update the metadata extension to conditionally include id_generator_path only when it's not None.
Additional Comments (1)
|
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
Summary
This PR refactors deduplication workflow interfaces to standardize outputs using a new WorkflowRunResult dataclass, replacing inconsistent return types (None, dict) with a unified structure. The changes include:
Core Changes
- Introduced
WorkflowRunResultdataclass andWorkflowBaseabstract class innemo_curator/pipeline/workflow.pyto standardize workflow outputs - Updated all deduplication workflows (exact, fuzzy, semantic) to inherit from
WorkflowBaseand returnWorkflowRunResult - Extended
TaskPerfUtilsto acceptWorkflowRunResultinputs for metrics aggregation - Added comprehensive test coverage for the new result structure
Critical Issues Found
1. Exact Deduplication Workflow - Undefined Variable (Line 257)
When assign_id=False, the variable id_generator_path is never initialized but is unconditionally used in the workflow_summary dictionary, causing a NameError at runtime.
2. Fuzzy Deduplication Workflow - Undefined Variable (Line 373)
When no LSH tasks are found (line 332), the code skips the else block where id_generator_path is defined, but then tries to use it unconditionally in extend_metadata, causing a NameError.
3. Text Duplicates Removal Workflow - Missing Metadata Field
The workflow doesn't set the num_output_tasks metadata field that:
- Tests explicitly check for (lines 193, 283 in test_removal_workflow.py)
- The
TextSemanticDeduplicationWorkflowdepends on (line 521 in semantic.py)
Impact
- Exact workflow: Will crash when
assign_id=Falseis used - Fuzzy workflow: Will crash when no duplicates are found
- Removal workflow: Tests will fail because expected metadata is missing; downstream code expecting
num_output_taskswill receiveNone
Files with Issues
nemo_curator/stages/deduplication/exact/workflow.py- variable scope bugnemo_curator/stages/deduplication/fuzzy/workflow.py- variable scope bugnemo_curator/stages/text/deduplication/removal_workflow.py- missing metadata field- Tests expect metadata that workflows don't provide
Confidence Score: 0/5
- This PR introduces critical runtime errors that will cause crashes in multiple workflows under specific conditions. Not safe to merge.
- Three critical bugs found: (1) Exact workflow crashes when assign_id=False due to undefined id_generator_path variable; (2) Fuzzy workflow crashes when no duplicates found due to undefined id_generator_path variable; (3) Removal workflow missing num_output_tasks metadata causing test failures and breaking dependent code. These are blocking issues that will cause production failures and test failures.
- nemo_curator/stages/deduplication/exact/workflow.py, nemo_curator/stages/deduplication/fuzzy/workflow.py, nemo_curator/stages/text/deduplication/removal_workflow.py
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| nemo_curator/pipeline/workflow.py | 5/5 | Introduced WorkflowRunResult dataclass and WorkflowBase abstract class to standardize workflow outputs. Implementation is clean and properly structured with helper methods for managing pipeline tasks and metadata. |
| nemo_curator/stages/deduplication/exact/workflow.py | 1/5 | Returns WorkflowRunResult with detailed timing and duplicate counts. CRITICAL BUG: Variable id_generator_path undefined when assign_id=False causes NameError on line 257 when accessing workflow_summary dictionary. |
| nemo_curator/stages/deduplication/fuzzy/workflow.py | 1/5 | Returns WorkflowRunResult with timing and duplicate counts. CRITICAL BUG: Variable id_generator_path undefined when no LSH tasks are present (line 332) causes NameError on line 373 when accessing extend_metadata call. |
| nemo_curator/stages/text/deduplication/removal_workflow.py | 2/5 | Returns WorkflowRunResult with removal timing and duplicate counts. CRITICAL BUG: Missing num_output_tasks metadata field that tests expect (lines 193, 283) and that semantic.py depends on (line 521). |
| tests/stages/text/deduplication/test_removal_workflow.py | 1/5 | Integration tests verify removal workflow with WorkflowRunResult. CRITICAL BUG: Tests expect num_output_tasks metadata (lines 193, 283) but workflow doesn't provide it, causing test failures. |
Sequence Diagram
sequenceDiagram
participant TestCode as Test/User
participant ExactWorkflow as ExactDeduplicationWorkflow
participant FuzzyWorkflow as FuzzyDeduplicationWorkflow
participant RemovalWorkflow as TextDuplicatesRemovalWorkflow
participant Result as WorkflowRunResult
TestCode->>ExactWorkflow: run(assign_id=True/False)
activate ExactWorkflow
ExactWorkflow->>Result: WorkflowRunResult("exact_deduplication")
Note over ExactWorkflow: Run pipeline stages
alt assign_id=True
ExactWorkflow->>ExactWorkflow: id_generator_path = os.path.join(...)
ExactWorkflow->>ExactWorkflow: write_id_generator_to_disk()
else assign_id=False
Note over ExactWorkflow: id_generator_path UNDEFINED!
end
ExactWorkflow->>ExactWorkflow: workflow_summary["id_generator_path"] = id_generator_path
Note over ExactWorkflow: ❌ NameError if assign_id=False
ExactWorkflow->>Result: extend_metadata(workflow_summary)
ExactWorkflow-->>TestCode: WorkflowRunResult
deactivate ExactWorkflow
TestCode->>FuzzyWorkflow: run()
activate FuzzyWorkflow
FuzzyWorkflow->>Result: WorkflowRunResult("fuzzy_deduplication")
alt valid_lsh_tasks exist
FuzzyWorkflow->>FuzzyWorkflow: id_generator_path = output_fs.sep.join(...)
FuzzyWorkflow->>FuzzyWorkflow: write_id_generator_to_disk()
else no valid_lsh_tasks
Note over FuzzyWorkflow: id_generator_path UNDEFINED!
end
FuzzyWorkflow->>Result: extend_metadata(...id_generator_path)
Note over FuzzyWorkflow: ❌ NameError if no LSH tasks
FuzzyWorkflow-->>TestCode: WorkflowRunResult
deactivate FuzzyWorkflow
TestCode->>RemovalWorkflow: run()
activate RemovalWorkflow
RemovalWorkflow->>Result: WorkflowRunResult("text_duplicates_removal")
Note over RemovalWorkflow: Run pipeline
RemovalWorkflow->>RemovalWorkflow: num_duplicates_removed = count_removed()
RemovalWorkflow->>Result: add_metadata("num_duplicates_removed", count)
Note over RemovalWorkflow: Missing: num_output_tasks
RemovalWorkflow-->>TestCode: WorkflowRunResult
deactivate RemovalWorkflow
TestCode->>TestCode: assert result.get_metadata("num_output_tasks")<br/>== len(output_tasks)
Note over TestCode: ❌ Test FAILS: num_output_tasks is None
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This PR refactors the deduplication workflow interfaces to standardize outputs using a new WorkflowRunResult dataclass and WorkflowBase abstract class. All deduplication workflows (exact, fuzzy, semantic) now return consistent results with pipeline tasks and timing metadata.
Key Changes:
- Introduced
WorkflowRunResultdataclass to encapsulate workflow outputs, pipeline tasks, and metadata - Added
WorkflowBaseabstract class requiring workflows to implementrun()returningWorkflowRunResult - Updated all workflow
run()methods to collect timing data and returnWorkflowRunResultobjects - Extended
TaskPerfUtilsto acceptWorkflowRunResultinputs for metrics aggregation
Critical Issues Found:
- Exact workflow: Undefined variable error when exceptions occur -
num_duplicates_identifiedis defined inside try block but used after finally block - Fuzzy workflow: Undefined variable error in no-duplicates case -
id_generator_pathonly set when duplicates found but always referenced in metadata - Fuzzy workflow: Duplicate metadata key -
total_timeset twice (lines 369 and 372) - Text semantic workflow: Attempts to read
num_output_tasksfrom removal workflow metadata, but this key is never set by the removal workflow
Moderate Issues:
- Exact workflow:
id_generator_pathunconditionally added to metadata even when None, which may be confusing
The refactoring provides a cleaner, more consistent API, but the critical bugs will cause runtime failures in specific scenarios (exception handling, no-duplicates cases).
Confidence Score: 2/5
- This PR has critical bugs that will cause runtime failures in edge cases and normal scenarios
- Score reflects multiple critical issues: (1) Undefined variable errors in exact workflow when exceptions occur, (2) Undefined variable errors in fuzzy workflow when no duplicates are found (a valid use case), (3) Missing metadata key causing None values in text semantic workflow. These bugs will cause NameError exceptions in production. Additionally, there's a logic bug with duplicate metadata keys. The refactoring concept is good, but implementation has several execution paths that weren't properly tested.
- Pay close attention to nemo_curator/stages/deduplication/exact/workflow.py (undefined variable in error paths), nemo_curator/stages/deduplication/fuzzy/workflow.py (undefined variable in no-duplicates case, duplicate metadata key), and nemo_curator/stages/text/deduplication/semantic.py (references non-existent metadata key)
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| nemo_curator/stages/deduplication/exact/workflow.py | 2/5 | Refactored run() to return WorkflowRunResult with timing metadata. Critical issue: id_generator_path can be None in error paths but is always added to metadata without conditional check. |
| nemo_curator/stages/deduplication/fuzzy/workflow.py | 3/5 | Refactored run() to return WorkflowRunResult with timing metadata. Properly handles no-duplicates case by skipping connected components. Metadata duplicate on line 370-375 (total_time set twice). |
| nemo_curator/stages/deduplication/semantic/workflow.py | 4/5 | Refactored run() to return WorkflowRunResult with timing metadata. Properly handles optional duplicate identification stage based on eps parameter. No critical issues found. |
| nemo_curator/stages/text/deduplication/removal_workflow.py | 5/5 | Refactored to inherit from WorkflowBase and return WorkflowRunResult. Metadata correctly includes num_duplicates_removed. Implementation looks solid. |
| nemo_curator/stages/text/deduplication/semantic.py | 3/5 | Monolithic workflow for text semantic deduplication. Issue found: tries to read num_output_tasks from removal workflow metadata but that key is never set by the removal workflow. |
| nemo_curator/pipeline/workflow.py | 5/5 | New WorkflowRunResult dataclass and WorkflowBase abstract class added. Clean implementation for standardizing workflow outputs with pipeline tasks and metadata. |
Sequence Diagram
sequenceDiagram
participant User
participant Workflow as ExactDeduplicationWorkflow
participant Pipeline as Pipeline
participant Result as WorkflowRunResult
participant Executor as RayActorPoolExecutor
User->>Workflow: run(initial_tasks, executor)
Workflow->>Workflow: _validate_initial_tasks()
Workflow->>Result: WorkflowRunResult(workflow_name)
alt assign_id is True
Workflow->>Workflow: create_id_generator_actor()
end
alt initial_tasks is None
Workflow->>Pipeline: _create_input_filegroups()
Pipeline->>Executor: run()
Executor-->>Pipeline: input tasks
Pipeline-->>Workflow: initial_tasks
Workflow->>Result: add_metadata("input_filegroups_time")
Workflow->>Result: add_pipeline_tasks("input_filegroups")
end
Workflow->>Pipeline: _create_identification_pipeline()
Pipeline->>Executor: run(initial_tasks)
Executor-->>Pipeline: removal_id_tasks
Pipeline-->>Workflow: removal_id_tasks
Workflow->>Result: add_metadata("identification_time")
Workflow->>Result: add_pipeline_tasks("identification")
alt assign_id is True
Workflow->>Workflow: write_id_generator_to_disk()
end
Workflow->>Workflow: kill_id_generator_actor()
Workflow->>Result: extend_metadata(workflow_summary)
Workflow-->>User: WorkflowRunResult
| @@ -227,8 +245,18 @@ def run( | |||
| else None, | |||
| ) | |||
| logger.info(f"Id generator written to {id_generator_path}") | |||
| end_time = time.time() | |||
| logger.info(f"Exact deduplication pipeline completed in {(end_time - start_time):.2f} seconds") | |||
| finally: | |||
| if self.assign_id: | |||
| kill_id_generator_actor() | |||
|
|
|||
| total_end_time = time.time() | |||
| total_time = total_end_time - total_start_time | |||
| workflow_summary = { | |||
| "total_time": total_time, | |||
| "num_duplicates": num_duplicates_identified, | |||
There was a problem hiding this comment.
num_duplicates_identified is defined inside the try block (line 233) but referenced outside the try block in the workflow_summary dictionary (line 256). If an exception occurs anywhere in the try block before line 233 (e.g., during input file group creation, pipeline initialization, or identification), this will cause a NameError: name 'num_duplicates_identified' is not defined when attempting to construct the workflow_summary.
Fix: Initialize num_duplicates_identified = 0 before the try block (around line 188 where other timing variables are initialized).
| workflow_result = WorkflowRunResult(workflow_name="exact_deduplication") | |
| input_filegroups_time = 0.0 | |
| identification_time = 0.0 | |
| num_duplicates_identified = 0 |
| @@ -343,7 +361,17 @@ def run( | |||
| else None, | |||
| ) | |||
| logger.info(f"Id generator written to {id_generator_path}") | |||
There was a problem hiding this comment.
The id_generator is only written to disk when duplicates are found (lines 356-363, inside the else block). This creates an inconsistency with the exact deduplication workflow, which always writes the id_generator if it was created, regardless of whether duplicates were found.
Problem: If no duplicates are found (len(valid_lsh_tasks) == 0), the id_generator is created and used to assign IDs during the minhash stage, but never persisted to disk. This means:
- The
id_generator_pathin metadata will be None - Users cannot use a removal workflow later because the ID mapping is lost
- The behavior is inconsistent with ExactDeduplicationWorkflow
Expected behavior: The id_generator should be written to disk after the if/else block (similar to exact workflow lines 239-247), regardless of whether duplicates were found, since IDs were assigned during the minhash stage.
| valid_lsh_tasks = [task for task in lsh_tasks or [] if task._metadata.get("num_docs", 0) > 0] | |
| if len(valid_lsh_tasks) == 0: | |
| logger.info("No potential duplicates found in the dataset. Skipping connected components pipeline.") | |
| workflow_result.add_metadata("num_duplicates", 0) | |
| else: | |
| connected_components_pipeline = self._create_connected_components_pipeline() | |
| connected_components_start_time = time.time() | |
| connected_components_tasks = connected_components_pipeline.run( | |
| executor=executor, initial_tasks=valid_lsh_tasks | |
| ) | |
| connected_components_end_time = time.time() | |
| connected_components_time = connected_components_end_time - connected_components_start_time | |
| workflow_result.add_pipeline_tasks("connected_components", connected_components_tasks) | |
| workflow_result.add_metadata("connected_components_time", connected_components_time) | |
| logger.info(f"Connected components pipeline completed in {connected_components_time:.2f} seconds") | |
| num_duplicates_identified = sum( | |
| task._metadata.get("num_removal_ids", 0) for task in (connected_components_tasks or []) | |
| ) | |
| workflow_result.add_metadata("num_duplicates", num_duplicates_identified) | |
| logger.info(f"Number of documents removed: {num_duplicates_identified}") | |
| # Write id_generator regardless of whether duplicates were found | |
| output_fs = get_fs( | |
| self.output_path, | |
| self.write_kwargs.get("storage_options") if self.write_kwargs is not None else None, | |
| ) | |
| id_generator_path = output_fs.sep.join([self.output_path, ID_GENERATOR_OUTPUT_FILENAME]) | |
| write_id_generator_to_disk( | |
| id_generator_path, | |
| storage_options=self.write_kwargs.get("storage_options") | |
| if self.write_kwargs is not None | |
| else None, | |
| ) | |
| logger.info(f"Id generator written to {id_generator_path}") |
Additional Comments (1)
The except block at line 543-545 re-raises the exception, but the code at lines 547-562 executes regardless. This means if an exception occurs after embedding_start_time (line 469) but before embedding_time is calculated (line 472), the variable will be undefined. Fix: Initialize all timing variables before the try block: Alternatively, move the metadata construction (lines 547-561) inside the try block after line 526, or use an else block like SemanticDeduplicationWorkflow does. |
Signed-off-by: Praateek <praateekm@gmail.com>
| @@ -227,8 +245,18 @@ def run( | |||
| else None, | |||
| ) | |||
| logger.info(f"Id generator written to {id_generator_path}") | |||
| end_time = time.time() | |||
| logger.info(f"Exact deduplication pipeline completed in {(end_time - start_time):.2f} seconds") | |||
| finally: | |||
| if self.assign_id: | |||
| kill_id_generator_actor() | |||
|
|
|||
| total_end_time = time.time() | |||
| total_time = total_end_time - total_start_time | |||
| workflow_summary = { | |||
| "total_time": total_time, | |||
| "num_duplicates": num_duplicates_identified, | |||
There was a problem hiding this comment.
The variable num_duplicates_identified is defined inside the try block (line 233) but is referenced outside the try-finally blocks at line 256 in workflow_summary. If an exception occurs before line 233 is executed (e.g., during input filegroups pipeline or identification pipeline setup), the variable will be undefined, causing a NameError when line 256 tries to access it.
Fix: Initialize num_duplicates_identified = 0 before the try block (after line 212):
| id_generator_path = None | |
| num_duplicates_identified = 0 | |
| try: |
Additional Comments (2)
If an exception occurs during workflow execution before these variables are assigned (e.g., during embedding generation, semantic deduplication, or removal), the code at lines 547-562 (which executes after the except block re-raises) will attempt to access undefined variables, causing a The except block at lines 543-545 re-raises the exception, but lines 547-562 still execute afterward. Fix: Initialize all timing variables before the try block:
This creates inconsistency in the workflow interface - all workflows should inherit from Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! |
ayushdg
left a comment
There was a problem hiding this comment.
change lgtm! Just minor questions
| return self.metadata.get(key) | ||
|
|
||
|
|
||
| class WorkflowBase(ABC): |
There was a problem hiding this comment.
Curious about the need for this class. Is it to add more stuff while expanding in the future?
There was a problem hiding this comment.
yup basically
| workflow_result = WorkflowRunResult(workflow_name="fuzzy_deduplication") | ||
| minhash_time = 0.0 | ||
| lsh_time = 0.0 | ||
| connected_components_time = 0.0 |
There was a problem hiding this comment.
nit: can we call this cc_pipeline_time or connected_components_pipeline time? The nit because the pipeline does much more than connected components (buckets to edges, connected components, shuffle on component etc).
| result = workflow.run(initial_tasks=exact_dedup_data_parquet) | ||
| assert result.pipeline_tasks | ||
| assert result.get_metadata("total_time") > 0 | ||
| assert (result.get_metadata("num_duplicates") or 0) > 0 |
There was a problem hiding this comment.
Not blocking here but we can go one step further and probably assert this matches a specific value
Signed-off-by: Praateek <praateekm@gmail.com>
…hajan/NeMo-Curator into praateek/add-workflow-results Signed-off-by: Praateek <praateekm@gmail.com>
| num_duplicates_identified = sum( | ||
| task._metadata.get("num_removal_ids", 0) for task in removal_id_tasks or [] | ||
| ) | ||
| if num_duplicates_identified == 0: |
There was a problem hiding this comment.
The variable num_duplicates_identified is defined inside the try block (lines 233-235) but is referenced outside it in the workflow_summary at line 256. If an exception occurs before this variable is assigned (e.g., during input file group creation or id generator setup), the code will throw a NameError when trying to access num_duplicates_identified in line 256.
Fix: Initialize num_duplicates_identified = 0 before the try block at line 213, similar to how input_filegroups_time and identification_time are initialized at lines 187-188.
| num_duplicates_identified = sum( | |
| task._metadata.get("num_removal_ids", 0) for task in removal_id_tasks or [] | |
| ) | |
| if num_duplicates_identified == 0: | |
| id_generator_path = None | |
| num_duplicates_identified = 0 | |
| try: |
Additional Comments (1)
The Fix: Initialize all timing variables before the |
| @@ -466,6 +470,7 @@ def run( # noqa: C901, PLR0912, PLR0915 | |||
| embedding_results = self._run_embedding_generation(embedding_executor) | |||
| embedding_end_time = time.time() | |||
| embedding_time = embedding_end_time - embedding_start_time | |||
| workflow_result.add_pipeline_tasks("embeddings", embedding_results) | |||
| logger.success(f"Embedding generation completed in {embedding_time:.2f} seconds") | |||
|
|
|||
| if self.use_id_generator: | |||
| @@ -488,17 +493,31 @@ def run( # noqa: C901, PLR0912, PLR0915 | |||
| ) | |||
| semantic_end_time = time.time() | |||
| semantic_time = semantic_end_time - semantic_start_time | |||
| # Merge pipeline tasks from semantic_results | |||
| for pipeline_name, tasks in semantic_results.pipeline_tasks.items(): | |||
| workflow_result.add_pipeline_tasks(pipeline_name, tasks) | |||
| # Preserve semantic stage metadata without clobbering keys from other stages | |||
| semantic_metadata = semantic_results.metadata or {} | |||
| workflow_result.add_metadata("kmeans_time", semantic_metadata.get("kmeans_time")) | |||
| workflow_result.add_metadata("pairwise_time", semantic_metadata.get("pairwise_time")) | |||
| num_duplicates_identified = semantic_metadata.get("num_duplicates", 0) or 0 | |||
| workflow_result.add_metadata("num_duplicates", num_duplicates_identified) | |||
|
|
|||
| logger.success(f"Semantic deduplication completed in {semantic_time:.2f} seconds") | |||
|
|
|||
| # Stage 3: Duplicate removal (optional) | |||
| removal_results = [] | |||
| removal_time = 0.0 | |||
| if self.perform_removal: | |||
| removal_start_time = time.time() | |||
| removal_results = self._run_duplicate_removal(removal_executor) | |||
| removal_end_time = time.time() | |||
| removal_time = removal_end_time - removal_start_time | |||
| if removal_results is not None: | |||
| for pipeline_name, tasks in removal_results.pipeline_tasks.items(): | |||
| workflow_result.add_pipeline_tasks(pipeline_name, tasks) | |||
| removal_metadata = removal_results.metadata or {} | |||
| num_duplicates_removed = removal_metadata.get("num_duplicates_removed") | |||
| workflow_result.add_metadata("num_duplicates_removed", num_duplicates_removed) | |||
|
|
|||
| logger.success(f"Duplicate removal completed in {removal_time:.2f} seconds") | |||
|
|
|||
| @@ -515,26 +534,29 @@ def run( # noqa: C901, PLR0912, PLR0915 | |||
| logger.info(f"Embedding generation time: {embedding_time:.2f} seconds") | |||
| logger.info(f"Semantic deduplication time: {semantic_time:.2f} seconds") | |||
| if self.perform_removal: | |||
| logger.info(f"Duplicate removal time: {removal_time:.2f} seconds (removed {removal_results} rows)") | |||
| if semantic_results.get("total_duplicates_identified", 0) > 0: | |||
| logger.success( | |||
| f"Total documents identified as duplicates: {semantic_results['total_duplicates_identified']}" | |||
| ) | |||
| logger.info(f"Duplicate removal time: {removal_time:.2f} seconds") | |||
| num_duplicates_identified = semantic_results.get_metadata("num_duplicates") or 0 | |||
| if num_duplicates_identified > 0: | |||
| logger.success(f"Total documents identified as duplicates: {num_duplicates_identified:,}") | |||
| logger.success("=" * 80) | |||
|
|
|||
| except Exception as e: | |||
| logger.error(f"Text semantic deduplication workflow failed: {e}") | |||
| raise | |||
|
|
|||
| return { | |||
| "total_execution_time": total_time, | |||
| "embedding_execution_time": embedding_time, | |||
| "semantic_execution_time": semantic_time, | |||
| "removal_execution_time": removal_time, | |||
| "embedding_results": embedding_results, | |||
| "semantic_results": semantic_results, | |||
| "removal_results": removal_results, | |||
| "embeddings_path": self.embeddings_path, | |||
| "semantic_dedup_path": self.semantic_dedup_path, | |||
| "final_output_path": self.deduplicated_output_path if self.perform_removal else None, | |||
| } | |||
| # Record consolidated metadata with clear, non-overlapping keys | |||
| workflow_result.extend_metadata( | |||
| { | |||
| "total_time": total_time, | |||
| # Stage timings | |||
| "embedding_time": embedding_time, | |||
| "identification_time": semantic_time, | |||
| "removal_time": removal_time, | |||
| # paths | |||
| "embeddings_path": self.embeddings_path, | |||
| "semantic_dedup_path": self.semantic_dedup_path, | |||
| "final_output_path": self.deduplicated_output_path if self.perform_removal else None, | |||
| "id_generator_path": self.id_generator_state_file if self.use_id_generator else None, | |||
| } | |||
| ) | |||
| return workflow_result | |||
There was a problem hiding this comment.
[P0] Timing variables undefined on exception path
Multiple timing variables (embedding_time, semantic_time, removal_time, total_time) are defined inside the try block (lines 451-541) but are referenced outside it in the metadata dictionary construction (lines 547-562).
The except block at lines 543-545 re-raises exceptions, but the code at lines 547-562 executes afterward regardless. If an exception occurs at any point during workflow execution before these variables are assigned, accessing them will cause a NameError.
For example, if an exception occurs during embedding generation (after line 469 but before line 472), embedding_time will be undefined when line 552 tries to use it.
Fix: Initialize all timing variables before the try block:
| total_start_time = time.time() | |
| workflow_result = WorkflowRunResult(workflow_name="text_semantic_deduplication") | |
| num_duplicates_identified = 0 | |
| embedding_time = 0.0 | |
| semantic_time = 0.0 | |
| removal_time = 0.0 | |
| total_time = 0.0 | |
| try: |
| total_start_time = time.time() | ||
| workflow_result = WorkflowRunResult(workflow_name="text_semantic_deduplication") | ||
| num_duplicates_identified = 0 |
There was a problem hiding this comment.
timing variables (embedding_time, semantic_time, removal_time, total_time) defined inside try block (lines 472, 495, 509, 526) but referenced after except block (lines 550-554). if exception occurs before assignment, NameError will occur.
| total_start_time = time.time() | |
| workflow_result = WorkflowRunResult(workflow_name="text_semantic_deduplication") | |
| num_duplicates_identified = 0 | |
| total_start_time = time.time() | |
| workflow_result = WorkflowRunResult(workflow_name="text_semantic_deduplication") | |
| num_duplicates_identified = 0 | |
| embedding_time = 0.0 | |
| semantic_time = 0.0 | |
| removal_time = 0.0 | |
| total_time = 0.0 |
Description
This pull request refactors the workflow interfaces for the deduplication pipelines (exact, fuzzy, and semantic) to standardize their outputs and improve usability.
Core API and Interface Refactoring
WorkflowRunResultdataclass innemo_curator/pipeline/workflow.pyto encapsulate workflow outputs, pipeline task mappings, and metadata. Also added an abstractWorkflowBaseclass to standardize workflow interfaces.ExactDeduplicationWorkflow,FuzzyDeduplicationWorkflow,SemanticDeduplicationWorkflow) to inherit fromWorkflowBaseand to return aWorkflowRunResultfrom theirrunmethods, instead of returningNoneor a dictionary.Workflow Output and Metadata Improvements
runmethods of all workflows to collect and record detailed timing and result metadata (such as per-stage execution times and duplicate counts) into theWorkflowRunResultobject.Usage
# Add snippet demonstrating usageChecklist