Skip to content

Fixing multiple state creation while unites#257

Merged
NiveditJain merged 17 commits intoexospherehost:mainfrom
NiveditJain:fixing-multiple-branch-creation
Aug 22, 2025
Merged

Fixing multiple state creation while unites#257
NiveditJain merged 17 commits intoexospherehost:mainfrom
NiveditJain:fixing-multiple-branch-creation

Conversation

@NiveditJain
Copy link
Member

No description provided.

- Introduced an asynchronous function to check if a state already exists before inserting it into the database.
- Updated the create_next_states function to gather existence checks for new unit states and only insert those that do not already exist.
- This enhancement improves efficiency by preventing duplicate state entries in the database.
- Updated the check_state_exists function to return a State instance or None instead of a boolean value, enhancing type clarity.
- Adjusted the create_next_states function to utilize the new return type, ensuring only valid states are appended to the not_inserted_new_states list.
- These changes improve the overall type safety and readability of the state management logic.
- Updated the check_state_exists function to use find_one instead of find, improving clarity and efficiency in checking for existing states.
- Enhanced the condition to return None if the state does not exist, aligning with the recent type safety improvements in the state management logic.
- Introduced a new _fingerprint field in the State model, with a method to generate its value based on relevant state attributes.
- Added a does_unites field to indicate if the state unites others.
- Updated create_next_states function to handle DuplicateKeyError during state insertion, improving error management and preventing crashes from duplicate entries.
- Removed the previous existence check for states, streamlining the state creation process and enhancing efficiency.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 22, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Warning

Rate limit exceeded

@NiveditJain has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 4 minutes and 16 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between d0305bc and 42d933c.

📒 Files selected for processing (3)
  • state-manager/app/controller/enqueue_states.py (2 hunks)
  • state-manager/app/tasks/create_next_states.py (4 hunks)
  • state-manager/tests/unit/controller/test_enqueue_states.py (8 hunks)
📝 Walkthrough

Summary by CodeRabbit

  • New Features
    • Ensures unique “uniting” states via automatic fingerprinting, preventing duplicates and improving consistency.
  • Bug Fixes
    • Handles rare duplicate-key race conditions when creating uniting states, reducing failures and noisy retries.
  • Refactor
    • Parallelized state enqueuing for higher throughput and faster queueing under load, with improved resilience to transient errors.
  • Tests
    • Updated unit tests to cover the new enqueueing flow, uniqueness guarantees, and error handling scenarios.

Walkthrough

Adds deterministic fingerprinting for states that unite other states (new fields and a before-event hook), marks generated unit states with does_unites, catches DuplicateKeyError on unit-state inserts, and parallelizes enqueuing via atomic per-state find_one_and_update calls.

Changes

Cohort / File(s) Summary of changes
State fingerprinting & index
state-manager/app/models/db/state.py
Added does_unites: bool and state_fingerprint: str fields; added @before_event([Insert, Replace, Save]) _generate_fingerprint that canonicalizes parents+identifiers and stores SHA‑256 hex digest when does_unites is true (clears otherwise); added partial unique IndexModel on state_fingerprint with {"does_unites": True}; new imports (IndexModel, hashlib, json, Beanie events).
Unit state creation (race handling)
state-manager/app/tasks/create_next_states.py
Mark generated unit states with does_unites when template unites is present; wrap insert_many of unit states in try/except DuplicateKeyError to log and ignore race-condition duplicate-key errors; small formatting tweaks.
Enqueuing: atomic claim & parallelization
state-manager/app/controller/enqueue_states.py
Added `async def find_state(namespace_name: str, nodes: list[str]) -> State
Tests updated for new enqueuing path
state-manager/tests/unit/controller/test_enqueue_states.py
Refactored tests to mock State.get_pymongo_collection().find_one_and_update() and assert batch-driven behavior, per-call invocation counts, and error handling consistent with new atomic/parallel enqueuing.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Scheduler
  participant Enqueuer as enqueue_states
  participant Finder as find_state (atomic)
  participant DB as MongoDB
  Note over Enqueuer,Finder: Parallelized per-state atomic claim
  Scheduler->>Enqueuer: request enqueue (namespace, nodes, batch_size)
  loop batch_size times (concurrent)
    Enqueuer->>Finder: find_state(namespace, nodes)
    Finder->>DB: find_one_and_update(filter: CREATED & node in nodes -> set QUEUED)
    alt matched
      DB-->>Finder: matched State doc
      Finder-->>Enqueuer: State
    else none / error
      DB-->>Finder: None / error
      Finder--x Enqueuer: None / exception
    end
  end
  Enqueuer->>Scheduler: aggregated queued states (skipping failures)
Loading
sequenceDiagram
  autonumber
  actor TaskWorker
  participant Creator as create_next_states
  participant StateModel as State (Beanie)
  participant Hook as before_event Hook
  participant DB as MongoDB

  TaskWorker->>Creator: generate_next_state(...)
  Creator->>StateModel: instantiate State(..., does_unites=True)
  StateModel-->>Hook: pre-insert/replace/save event
  Hook->>Hook: canonicalize parents -> SHA-256 -> set state_fingerprint
  Hook-->>StateModel: document with fingerprint
  Creator->>DB: insert_many(unit_states)
  alt DuplicateKeyError (concurrent insert)
    DB--x Creator: DuplicateKeyError
    Creator->>Creator: log warning and continue
  else success
    DB-->>Creator: insert acknowledged
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • Fixes for bugs encountered  #205 — Modifies the same State model and state creation logic (parents/identifier handling) which affects the canonicalized parents and IDs used by the new state_fingerprint generation.

Poem

I thump through code and crocheted state,
I hash the hops to keep one fate,
If doubles crowd, I blink, then log,
One fingerprint guards every log.
Hooray — united hops align! 🥕

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @NiveditJain, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses an issue where multiple identical states could be created, specifically in contexts related to 'unites' operations. It introduces a robust mechanism to ensure state uniqueness by adding a new 'fingerprint' field to the 'State' model, which is a hash of key state attributes. A unique partial index is then applied to this fingerprint for 'unites' states, preventing duplicates at the database level. Additionally, the state creation process is enhanced with error handling to gracefully manage attempts to insert duplicate states, improving the overall stability and data integrity of the state management system.

Highlights

  • Duplicate State Prevention: This PR introduces a mechanism to prevent the creation of duplicate 'State' entries, particularly in scenarios involving 'unites' operations, by leveraging unique indexing.
  • State Fingerprinting: A new '_fingerprint' field is added to the 'State' model. This fingerprint is a SHA256 hash generated from core state attributes (node_name, namespace_name, identifier, graph_name, run_id, parents), ensuring a unique identifier for each distinct state.
  • Unique Partial Indexing: A unique partial index is now applied to the '_fingerprint' field for 'State' objects where 'does_unites' is true. This ensures that no two 'unites' states with the same fingerprint can exist in the database.
  • Robust Error Handling: The state creation logic is updated to catch 'DuplicateKeyError' during bulk insertions of new states. This prevents application crashes when an attempt is made to insert a state that already exists due to the new unique index.
  • New 'does_unites' Field: A 'does_unites' boolean field has been added to the 'State' model. This field is used in conjunction with the partial index to selectively enforce uniqueness only on states that are part of 'unites' operations.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request aims to prevent the creation of duplicate 'unites' states by introducing a unique, partial index on a state fingerprint. This is a solid approach to handle race conditions. However, my review identified a critical issue where the flag enabling the partial index is never set, which would prevent the fix from working. Additionally, there's a high-severity issue with non-deterministic fingerprint generation. I've provided suggestions to address these points, along with a medium-severity recommendation to adjust the logging for expected errors.

NiveditJain and others added 6 commits August 22, 2025 12:39
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
- Updated the generate_next_state function to include a does_unites parameter, indicating whether the state unites others. This change enhances the clarity and functionality of state generation within the state management logic.
- Updated the State model to sort the parents dictionary by key when generating the fingerprint. This change ensures consistent fingerprint values, enhancing the integrity of state management.
- Updated the State model to rename the private _fingerprint field to state_fingerprint for improved clarity and consistency. The fingerprint generation method has been adjusted accordingly to reflect this change.
@codecov
Copy link

codecov bot commented Aug 22, 2025

Codecov Report

❌ Patch coverage is 63.88889% with 13 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
state-manager/app/models/db/state.py 64.70% 6 Missing ⚠️
state-manager/app/tasks/create_next_states.py 16.66% 5 Missing ⚠️
state-manager/app/controller/enqueue_states.py 84.61% 2 Missing ⚠️

📢 Thoughts on this report? Let us know!

@NiveditJain
Copy link
Member Author

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 22, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@NiveditJain
Copy link
Member Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism to prevent the creation of duplicate 'uniting' states by using a state fingerprint and a unique partial MongoDB index. This is a good approach to solving the potential race condition. My review includes a critical fix for the index definition which is currently pointing to a non-existent field. I've also added suggestions to improve the efficiency of fingerprint generation and to make the logging in the case of a duplicate key error more concise and useful.

- Updated the _generate_fingerprint method in the State model to handle cases where does_unites is False, ensuring that the state_fingerprint is cleared appropriately. This change enhances the robustness of fingerprint generation by preventing unnecessary calculations when uniting states is not applicable.
@NiveditJain
Copy link
Member Author

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 22, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@NiveditJain
Copy link
Member Author

@cursoragent review

@NiveditJain
Copy link
Member Author

@coderabbitai re-review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 22, 2025

@NiveditJain I'll perform a comprehensive re-review of all the changes in this PR.

✅ Actions performed

Full review triggered.

- Updated the State model to improve the description of the does_unites field for clarity.
- Refined the fingerprint generation method to ensure consistent payload formatting and added a unique index for states that unite others.
- Enhanced logging in the create_next_states function to provide more context during DuplicateKeyError occurrences, improving error traceability.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
state-manager/app/models/db/state.py (1)

30-44: Deterministic, canonical fingerprinting — LGTM

  • Using sort_keys=True, separators=(",", ":"), and ensure_ascii=True yields canonical JSON.
  • Parents’ ObjectIds are stringified; combined with sort_keys this gives a stable payload.

This addresses the earlier concern about non-deterministic key order in JSON serialization.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 89fa36a and d9f0fa1.

📒 Files selected for processing (2)
  • state-manager/app/models/db/state.py (2 hunks)
  • state-manager/app/tasks/create_next_states.py (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
state-manager/app/models/db/state.py (2)
state-manager/app/models/db/base.py (1)
  • BaseDatabaseModel (7-15)
state-manager/app/models/db/graph_template_model.py (1)
  • Settings (24-31)
state-manager/app/tasks/create_next_states.py (1)
state-manager/app/models/db/state.py (1)
  • State (11-58)
🔇 Additional comments (5)
state-manager/app/models/db/state.py (4)

1-1: Import of IndexModel is appropriate

Needed for partial unique index configuration on the model. No issues.


5-5: Beanie event imports look correct

Importing Insert, Replace, Save, and before_event matches the hook usage below.


7-8: Hashing and JSON imports are appropriate

These are the right choices for deterministic fingerprinting.


46-58: Manual Verification Required: Confirm Partial Unique Index in MongoDB

The automated check couldn’t run because the required environment variables weren’t set. Please verify the index definition manually by either:

  • Setting the following environment variables and rerunning the script:
    • MONGODB_URI (e.g., mongodb://localhost:27017)
    • DB_NAME (e.g., exosphere)
    • STATE_COLLECTION_NAME (e.g., state)

    Then re-execute:

    mongo "$MONGODB_URI/$DB_NAME" --quiet --eval \
    "var idxs = db.getCollection('$STATE_COLLECTION_NAME').getIndexes();
     var target = idxs.filter(idx => idx.name === 'uniq_state_fingerprint_unites');
     if (target.length) { printjson(target[0]); } else { print('Index not found'); }"

    Share the printed JSON to confirm:
    – key: { state_fingerprint: 1 }
    – unique: true
    – partialFilterExpression: { does_unites: true }

  • Or within a Mongo shell connected to your target database, run:

    db.getCollection('state').getIndexes().filter(i => i.name === 'uniq_state_fingerprint_unites')

    and verify the index definition matches the model.

Once confirmed, you can rely on the partial unique index for production de-duplication.

state-manager/app/tasks/create_next_states.py (1)

139-139: Setting does_unites correctly at creation time

This ensures the partial unique index is in scope for unite states. Good fix that aligns with the model.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
state-manager/app/controller/enqueue_states.py (3)

50-56: Access fields from raw Mongo documents; attribute access will raise

states contains raw dicts from Motor/PyMongo, so state.id / state.node_name will fail. Use dict access and normalize _id.

-                    state_id=str(state.id),
-                    node_name=state.node_name,
-                    identifier=state.identifier,
-                    inputs=state.inputs,
-                    created_at=state.created_at
+                    state_id=str(state["_id"]),
+                    node_name=state["node_name"],
+                    identifier=state["identifier"],
+                    inputs=state.get("inputs", {}),
+                    created_at=state.get("created_at")

48-49: Ensure response.status is a string, not an Enum instance

EnqueueResponseModel.status is str. Use .value explicitly for clarity and to avoid downstream JSON serialization surprises.

-            status=StateStatusEnum.QUEUED,
+            status=StateStatusEnum.QUEUED.value,

27-61: Update mocks in test_enqueue_states.py to use find_one_and_update instead of State.find

The new enqueue_states implementation no longer calls State.find(...).limit().to_list(), but uses find_one_and_update on the Motor collection inside find_state. You’ll need to adjust the unit tests accordingly:

• In state-manager/tests/unit/controller/test_enqueue_states.py:
– Replace any patch of State.find and its .limit().to_list() chain with a patch of State.get_motor_collection().
– Stub the returned collection’s find_one_and_update method (an AsyncMock) to yield your dict-shaped state documents for the first N calls, then return None.
– Build your expected StateModel from those dicts (mapping _idstate_id, node_name, identifier, etc.).

• Example adjustment:

- @patch('app.controller.enqueue_states.State.find')
- async def test_enqueue_states_success(..., mock_find):
-     mock_query = MagicMock()
-     mock_query.limit.return_value = mock_query
-     mock_query.to_list = AsyncMock(return_value=[mock_state])
-     mock_find.return_value = mock_query
+ @patch('app.controller.enqueue_states.State.get_motor_collection')
+ async def test_enqueue_states_success(..., mock_get_coll):
+     coll = AsyncMock()
+     # First call returns a document, subsequent calls return None
+     coll.find_one_and_update.side_effect = [
+         {"_id": PydanticObjectId(), "node_name":"node1", "identifier":"id1",
+          "inputs":{"key":"val"}, "created_at": now},
+         None, None, /* up to batch_size */
+     ]
+     mock_get_coll.return_value = coll
+     result = await enqueue_states(
+         namespace, EnqueueRequestModel(nodes=["node1"], batch_size=3), req_id
+     )
+     assert result.count == 1
+     assert result.status == StateStatusEnum.QUEUED

Make these changes before rerunning the tests so they align with the updated enqueue_states logic.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between d9f0fa1 and b0ab37c.

📒 Files selected for processing (1)
  • state-manager/app/controller/enqueue_states.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
state-manager/app/controller/enqueue_states.py (5)
state-manager/app/models/db/state.py (1)
  • State (11-58)
state-manager/app/models/state_status_enum.py (1)
  • StateStatusEnum (4-15)
state-manager/app/models/enqueue_request.py (1)
  • EnqueueRequestModel (4-6)
state-manager/app/models/enqueue_response.py (1)
  • EnqueueResponseModel (14-19)
state-manager/tests/unit/controller/test_enqueue_states.py (5)
  • test_enqueue_states_multiple_states (116-162)
  • test_enqueue_states_success (40-83)
  • test_enqueue_states_no_states_found (86-113)
  • TestEnqueueStates (11-216)
  • test_enqueue_states_database_error (165-184)
🔇 Additional comments (1)
state-manager/app/controller/enqueue_states.py (1)

13-26: Ensure atomic update returns the updated State instance and uses an async collection helper

Please verify the following before merging:

  • Confirm that State.get_pymongo_collection() actually returns an async Motor collection. If it’s a synchronous PyMongo handle, switch to or add a State.get_motor_collection() helper and use that here.
  • Include return_document=ReturnDocument.AFTER on your find_one_and_update so you receive the document after the status change.
  • Wrap the resulting raw dict in your Pydantic State model (e.g. via State.parse_obj(doc)) to match the declared return type.
  • If your database stores enum values as raw strings, use .value when matching and updating StateStatusEnum.

Suggested diff:

--- a/state-manager/app/controller/enqueue_states.py
+++ b/state-manager/app/controller/enqueue_states.py
@@ -13,7 +13,20 @@ async def find_state(namespace_name: str, nodes: list[str]) -> State | None:
-    return await State.get_pymongo_collection().find_one_and_update(
-        {
-            "namespace_name": namespace_name,
-            "status": StateStatusEnum.CREATED,
-            "node_name": {"$in": nodes}
-        },
-        {"$set": {"status": StateStatusEnum.QUEUED}}
-    )
+    from pymongo import ReturnDocument
+
+    # Ensure we’re using an async Motor collection
+    collection = State.get_pymongo_collection()  # rename to get_motor_collection() if needed
+
+    doc = await collection.find_one_and_update(
+        {
+            "namespace_name": namespace_name,
+            "status": StateStatusEnum.CREATED.value,
+            "node_name": {"$in": nodes},
+        },
+        {"$set": {"status": StateStatusEnum.QUEUED.value}},
+        return_document=ReturnDocument.AFTER,
+    )
+    if doc is None:
+        return None
+
+    # Rehydrate into a State instance
+    return State.parse_obj(doc)


Please double-check your helper’s name and async behavior, and confirm whether enums require .value when persisting.

- Updated unit tests in test_enqueue_states.py to mock State.get_pymongo_collection().find_one_and_update() instead of using find().limit().to_list() for better clarity and efficiency.
- Adjusted assertions to reflect changes in the expected results, ensuring that batch sizes are correctly handled.
- Enhanced error handling in tests to verify graceful handling of exceptions during database operations.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
state-manager/tests/unit/controller/test_enqueue_states.py (1)

151-156: Avoid order-dependent assertions with concurrent operations

asyncio.gather preserves task order, but DB scheduling can still yield nondeterministic data selection. Make the assertions order-agnostic to prevent flaky tests.

Apply this diff:

-        assert result.states[0].node_name == "node1"
-        assert result.states[1].node_name == "node2"
+        assert {s.node_name for s in result.states} == {"node1", "node2"}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between b0ab37c and 4cea669.

📒 Files selected for processing (1)
  • state-manager/tests/unit/controller/test_enqueue_states.py (7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
state-manager/tests/unit/controller/test_enqueue_states.py (2)
state-manager/app/models/state_status_enum.py (1)
  • StateStatusEnum (4-15)
state-manager/app/controller/enqueue_states.py (2)
  • enqueue_states (27-64)
  • enqueue_states (13-54)
🔇 Additional comments (1)
state-manager/tests/unit/controller/test_enqueue_states.py (1)

50-53: Mock the async Motor collection, not the sync PyMongo one
We weren’t able to locate a get_motor_collection helper in BaseDatabaseModel—please confirm that your State model (in state-manager/app/models/db/base.py) actually exposes a get_motor_collection() class method. If it doesn’t, you’ll need to add one (or adjust the test to match the real async helper).

• In state-manager/tests/unit/controller/test_enqueue_states.py (lines 50–53):

-        # Mock State.get_pymongo_collection().find_one_and_update()
+        # Mock State.get_motor_collection().find_one_and_update()
         mock_collection = MagicMock()
         mock_collection.find_one_and_update = AsyncMock(return_value=mock_state)
-        mock_state_class.get_pymongo_collection = MagicMock(return_value=mock_collection)
+        mock_state_class.get_motor_collection = MagicMock(return_value=mock_collection)

• Near the top of that test file (outside the snippet) add:

import pytest
pytestmark = pytest.mark.asyncio

Before merging, verify:

  • That BaseDatabaseModel or its parent actually defines get_motor_collection().
  • If it only provides get_pymongo_collection(), either implement an async wrapper or update both code and tests to use the correct helper.

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
state-manager/app/tasks/create_next_states.py (1)

237-247: Precompute fingerprints before bulk inserting and handle BulkWriteError

The insert_many method in Beanie does not trigger your @before_event hooks (e.g., Insert), so any logic in those handlers—like generating state_fingerprint—won’t run on bulk inserts. To ensure deterministic deduplication and avoid unused-import errors (Ruff F401 on BulkWriteError), please:

  • Replace the len(new_unit_states) > 0 check with a truthy check (if new_unit_states:).
  • Manually precompute each state’s fingerprint before calling insert_many, since bulk operations skip per-document hooks.
  • Catch BulkWriteError first to handle duplicate-key errors in the bulk path (error code 11000), then fall back to catching DuplicateKeyError. Log only the list of fingerprints for concise diagnostics.

Suggested diff:

-        try:
-            if len(new_unit_states) > 0:
-                await State.insert_many(new_unit_states)
-        except DuplicateKeyError:
-            logger.warning(
-                f"Caught duplicate key error for new unit states in namespace={namespace}, "
-                f"graph={graph_name}, likely due to a race condition. "
-                f"Attempted to insert {len(new_unit_states)} states"
-            )
+        try:
+            if new_unit_states:
+                # Bulk inserts skip Beanie per-document hooks; ensure fingerprints exist
+                for s in new_unit_states:
+                    if s.does_unites:
+                        s._generate_fingerprint()
+                await State.insert_many(new_unit_states)
+        except BulkWriteError as e:
+            write_errors = (e.details or {}).get("writeErrors", [])
+            # Handle duplicate-key errors in bulk operations
+            if any(err.get("code") == 11000 for err in write_errors):
+                fps = [s.state_fingerprint for s in new_unit_states if s.does_unites]
+                logger.warning(
+                    f"Caught duplicate-key (bulk) for new unit states in namespace={namespace}, "
+                    f"graph={graph_name}, race condition likely. "
+                    f"Attempted {len(new_unit_states)} inserts; fingerprints={fps}"
+                )
+            else:
+                raise
+        except DuplicateKeyError:
+            # Fallback for single-document duplicate-key errors
+            fps = [s.state_fingerprint for s in new_unit_states if s.does_unites]
+            logger.warning(
+                f"Caught duplicate-key error for new unit states in namespace={namespace}, "
+                f"graph={graph_name}, race condition likely. "
+                f"Attempted {len(new_unit_states)} inserts; fingerprints={fps}"
+            )

Optional: if your Beanie version supports ordered=False on insert_many, you can allow non-duplicate documents to persist in the same batch. Would you like a focused unit test that runs two concurrent insert paths and asserts only one state per fingerprint is persisted?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 4cea669 and d0305bc.

📒 Files selected for processing (1)
  • state-manager/app/tasks/create_next_states.py (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
state-manager/app/tasks/create_next_states.py (1)
state-manager/app/models/db/state.py (2)
  • State (11-58)
  • State (8-18)
🪛 GitHub Actions: Ruff check on changed files only
state-manager/app/tasks/create_next_states.py

[error] 2-2: Ruff: F401 'BulkWriteError' imported but unused. Remove unused import.

🔇 Additional comments (2)
state-manager/app/tasks/create_next_states.py (2)

62-62: Benign formatting change

Extra blank line before get_dependents improves readability. No action needed.


139-139: Correctly flag unit states for partial unique index enforcement

Setting does_unites based on next_state_node_template.unites ensures the partial unique index on state_fingerprint applies only to unit states. This directly supports deduplication and aligns with the State model’s Settings.indexes.

- Updated the find_state function to return a State object or None, improving clarity in state retrieval.
- Modified the create_next_states task to handle BulkWriteError in addition to DuplicateKeyError for better error management.
- Refactored unit tests in test_enqueue_states.py to mock find_state instead of directly mocking database calls, enhancing test readability and maintainability.
- Removed unused AsyncMock import from test_enqueue_states.py, streamlining the test code for better readability.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant