Skip to content

fix: harden TaskEngine error handling, types, and test coverage#322

Closed
Aureliolo wants to merge 5 commits into
mainfrom
feat/task-engine
Closed

fix: harden TaskEngine error handling, types, and test coverage#322
Aureliolo wants to merge 5 commits into
mainfrom
feat/task-engine

Conversation

@Aureliolo
Copy link
Copy Markdown
Owner

Summary

  • Typed error hierarchy: Add TaskNotFoundError, TaskEngineQueueFullError, TaskVersionConflictError for precise error classification — API controllers catch these directly instead of parsing error strings
  • Error sanitization: Internal exception details (SQL paths, stack traces) no longer leak to API responses
  • Immutable field protection: Model validators on UpdateTaskMutation and TransitionTaskMutation reject writes to id, status, created_by, created_at, etc.
  • Processing loop hardening: Guard _process_one against unhandled exceptions, thread previous_status through mutation results and snapshots, add _fail_remaining_futures for drain timeout cleanup
  • Logging coverage: Add event constants and structured logging for engine creation, version conflicts, and loop errors
  • AgentEngine integration: Split broad except Exception in _report_to_task_engine into TaskMutationError (warning) vs Exception (error with divergence note); extract _TERMINAL_STATUSES constant
  • Code quality: Extract _not_found_result helper, use Self in model validators, add _check_consistency validator on TaskMutationResult, exhaustive match default branch
  • Docs: Update tech-stack "State coordination" to Adopted, add TaskEngine to CLAUDE.md engine description and logging events, add TaskEngine architecture subsection to engine design page

Test plan

  • TestAppStateTaskEngine — 6 tests for task_engine property, has_task_engine, set_task_engine
  • TestReportToTaskEngine — 5 tests: no-op without engine, skip non-terminal, report terminal, swallow TaskMutationError, swallow unexpected errors
  • TestAppLifecycle — 2 new tests: task engine startup failure cleans up persistence+bus, shutdown task engine failure doesn't propagate
  • TestVersionConflictOnTransition — version mismatch returns failure
  • TestCancelNotFound — cancel on non-existent task returns failure
  • TestPreviousStatusprevious_status populated correctly (create=None, transition=CREATED, cancel=ASSIGNED)
  • TestImmutableFieldRejection — update/transition reject immutable fields
  • TestTypedErrors — convenience methods raise TaskNotFoundError
  • All 6925 tests pass, 94.39% coverage
  • ruff check + format clean
  • mypy strict clean

Review coverage

Pre-reviewed by 10 agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, comment-analyzer, type-design-analyzer, logging-audit, resilience-audit, security-reviewer, docs-consistency). 37 findings consolidated, all implemented.

Closes #204

🤖 Generated with Claude Code

Implement actor-like TaskEngine that owns all task state mutations via
asyncio.Queue. A single background task processes mutations sequentially
using model_copy(update=...), persists results, and publishes snapshots
to the message bus. Reads bypass the queue for direct persistence access.

- Add TaskEngine core with start/stop lifecycle, submit/convenience methods
- Add 5 mutation types (create, update, transition, delete, cancel)
- Add TaskMutationResult response and TaskStateChanged event models
- Add TaskEngineConfig (queue size, drain timeout, snapshot toggle)
- Add 4 error types (TaskEngineError, NotRunning, Mutation, VersionConflict)
- Wire into API controllers, AppState, app lifecycle, and config
- Add optional AgentEngine report-back for terminal task status
- Add 57 unit tests covering all mutations, ordering, versioning, drain

Closes #204
Pre-reviewed by 10 agents, 37 findings addressed:

- Add exhaustive match default + typed error hierarchy (TaskNotFoundError,
  TaskEngineQueueFullError, TaskVersionConflictError)
- Sanitize internal exception details from API responses
- Add immutable field rejection validators on UpdateTaskMutation and
  TransitionTaskMutation
- Thread previous_status through TaskMutationResult and snapshots
- Add consistency model_validator to TaskMutationResult
- Guard _processing_loop against unhandled exceptions
- Fix startup cleanup to handle task engine failures
- Replace assert with proper error handling in convenience methods
- Add _fail_remaining_futures for drain timeout cleanup
- Add comprehensive logging coverage (creation, conflicts, loop errors)
- Add _not_found_result helper to reduce duplication
- Extract _TERMINAL_STATUSES module constant
- Use Self return type in model validators
- Split broad except in _report_to_task_engine (TaskMutationError vs Exception)
- Update docs: tech-stack Adopted, CLAUDE.md engine description, engine.md
  TaskEngine architecture subsection
- Add tests: AppState.task_engine, _report_to_task_engine, app lifecycle,
  version conflicts, cancel not-found, previous_status, immutable fields,
  typed errors
Copilot AI review requested due to automatic review settings March 12, 2026 12:57
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Mar 12, 2026

Dependency Review

✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.

Scanned Files

None

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 12, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 96b7a3f4-0f29-449d-9190-4a4ed741cfac

📥 Commits

Reviewing files that changed from the base of the PR and between c92fed8 and 3254142.

📒 Files selected for processing (1)
  • tests/unit/engine/task_engine_helpers.py

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Centralized task state engine for reliable single-writer task processing; API now routes task CRUD and transitions through the engine with consistent error responses.
  • Configuration

    • New task engine configuration (queue sizing, drain timeout, snapshot publishing) with sensible defaults.
  • Observability

    • Added standardized task-engine event names for lifecycle, mutations, queue/backpressure, and snapshot publishing.
  • Documentation

    • Architecture and design docs updated to mark state coordination as adopted and expand conventions.
  • Tests

    • Extensive unit and integration tests covering lifecycle, mutations, concurrency, and drain behavior.

Walkthrough

Adds a centralized single-writer TaskEngine (queue-based mutation processing, optimistic concurrency, snapshot publishing) and integrates it into API lifecycle, configuration, controllers, AgentEngine reporting, observability events, and comprehensive unit/integration tests.

Changes

Cohort / File(s) Summary
Core Engine
src/ai_company/engine/task_engine.py, src/ai_company/engine/task_engine_models.py, src/ai_company/engine/task_engine_config.py, src/ai_company/engine/errors.py
New TaskEngine implementation, mutation/event models, immutable config model, and new error hierarchy for engine and mutation failures.
Engine Public Surface
src/ai_company/engine/__init__.py
Exports TaskEngine, TaskEngineConfig, mutation models, and new error symbols on the engine package API.
API Lifecycle & State
src/ai_company/api/app.py, src/ai_company/api/state.py, src/ai_company/config/schema.py, src/ai_company/config/defaults.py
App lifecycle and AppState extended to accept/store TaskEngine; config schema/defaults add task_engine entry; create_app and lifecycle wiring updated.
API Controllers
src/ai_company/api/controllers/tasks.py
Controllers moved from TaskRepository to TaskEngine for CRUD; added mappings from engine errors to API errors; create/update/transition/delete now use engine mutation APIs.
Agent Integration
src/ai_company/engine/agent_engine.py
AgentEngine accepts optional TaskEngine and reports terminal execution results to the TaskEngine (best-effort), handling TaskMutationError and other failures.
Observability
src/ai_company/observability/events/task_engine.py, src/ai_company/observability/events/api.py
New task_engine event constants and a new API event API_TASK_TRANSITION_FAILED.
Docs
docs/design/engine.md, docs/architecture/tech-stack.md, CLAUDE.md
Design and architecture docs updated to document TaskEngine, conventions, and execution-loop example change.
Tests & Helpers
tests/unit/engine/*, tests/unit/api/*, tests/unit/observability/test_events.py
Extensive new tests: models, lifecycle, mutations, integration, agent reporting; test fixtures and in-memory fake persistence/message-bus helpers; test_client fixture updated to inject TaskEngine.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant API as API Controller
    participant Engine as TaskEngine
    participant Queue as asyncio.Queue
    participant Persistence as PersistenceBackend
    participant Bus as MessageBus

    Client->>API: POST /tasks (create)
    API->>Engine: create_task(data, requested_by)
    Engine->>Engine: validate running
    Engine->>Queue: submit(CreateTaskMutation)
    Engine-->>API: return/pending future (202)
    
    par Background processing
        Queue->>Engine: dequeue mutation
        Engine->>Engine: _process_one(envelope)
        Engine->>Persistence: tasks.save(task with bumped version)
        Persistence-->>Engine: saved task
        Engine->>Bus: publish(TaskStateChanged)
        Bus-->>Engine: ack (best-effort)
        Engine-->>API: future.set_result(TaskMutationResult)
    end

    Client->>API: GET /tasks/{id}
    API->>Engine: get_task(task_id)
    Engine->>Persistence: tasks.get(task_id)  -- read-through
    Persistence-->>Engine: Task
    Engine-->>API: Task
    API-->>Client: 200 OK
Loading
sequenceDiagram
    participant Agent as AgentEngine
    participant Execution as ExecutionLoop
    participant TaskEng as TaskEngine
    participant Persistence as PersistenceBackend

    Agent->>Execution: run(task)
    Execution-->>Agent: ExecutionResult(status=COMPLETED)
    Agent->>Agent: _post_execution_pipeline(result)
    alt TaskEngine configured
        Agent->>TaskEng: transition_task(task_id, COMPLETED, requested_by=agent)
        TaskEng->>Persistence: apply transition, bump version
        Persistence-->>TaskEng: updated task
        TaskEng->>TaskEng: publish snapshot (optional)
        TaskEng-->>Agent: success
    else no TaskEngine
        Agent-->>Agent: no-op (best-effort)
    end
    alt Publish/report fails
        Agent->>Agent: log warning/error (non-fatal)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 35.92% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately reflects the main focus: hardening TaskEngine error handling, types, and test coverage. It's specific and summarizes the core changes.
Description check ✅ Passed The description is comprehensive and directly related to the changeset, detailing typed error hierarchy, field protection, loop hardening, logging improvements, and test coverage.
Linked Issues check ✅ Passed The PR fully addresses issue #204 objectives: implements TaskEngine single-writer with asyncio.Queue, sequential mutation application, snapshot publishing, version tracking, lifecycle management, mutation types, and TaskMutationResult/TaskStateChanged models.
Out of Scope Changes check ✅ Passed All changes are within scope of #204: TaskEngine implementation, error handling, immutable field protection, and integration with API and AgentEngine. Documentation updates and test additions directly support the main objectives.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/task-engine
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch feat/task-engine
📝 Coding Plan for PR comments
  • Generate coding plan

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly hardens the TaskEngine by centralizing task state management, improving error handling, and enhancing data integrity. It refactors task-related API endpoints to leverage the new engine, ensuring consistent state transitions and robust error reporting. The changes also introduce comprehensive logging and updated documentation to reflect the new architecture, making the system more reliable and easier to understand.

Highlights

  • Typed Error Hierarchy: Introduced TaskNotFoundError, TaskEngineQueueFullError, and TaskVersionConflictError for precise error classification, allowing API controllers to catch specific error types instead of parsing error strings.
  • Error Sanitization: Implemented mechanisms to prevent internal exception details, such as SQL paths and stack traces, from leaking into API responses.
  • Immutable Field Protection: Added model validators to UpdateTaskMutation and TransitionTaskMutation to reject write attempts on immutable fields like id, status, created_by, and created_at.
  • Processing Loop Hardening: Enhanced the _process_one method with unhandled exception guards, threaded previous_status through mutation results, and added _fail_remaining_futures for robust drain timeout cleanup.
  • Logging Coverage: Expanded structured logging with new event constants for engine creation, version conflicts, and loop errors, improving observability.
  • AgentEngine Integration: Refined error handling in _report_to_task_engine to differentiate between TaskMutationError (logged as warning) and other Exception types (logged as error with divergence note), and extracted a _TERMINAL_STATUSES constant.
  • Code Quality Improvements: Refactored common logic into a _not_found_result helper, utilized Self in model validators, added a _check_consistency validator on TaskMutationResult, and ensured exhaustive matching in default branches.
  • Documentation Updates: Updated the tech-stack to reflect 'State coordination' as Adopted, included TaskEngine in CLAUDE.md engine description and logging events, and added a dedicated TaskEngine architecture subsection to the engine design page.
Changelog
  • CLAUDE.md
    • Updated engine module description to include TaskEngine.
    • Added TASK_ENGINE_STARTED to the list of logging event constants.
  • docs/architecture/tech-stack.md
    • Updated 'State coordination' status from 'Planned' to 'Adopted'.
  • docs/design/engine.md
    • Added a new section detailing TaskEngine architecture, mutation types, error handling, and lifecycle.
  • src/ai_company/api/app.py
    • Imported TaskEngine.
    • Modified _build_lifecycle to include task_engine in startup and shutdown processes.
    • Updated _cleanup_on_failure to handle TaskEngine cleanup during startup failures.
    • Updated _safe_startup and _safe_shutdown to manage TaskEngine lifecycle.
    • Added task_engine parameter to create_app function.
  • src/ai_company/api/controllers/tasks.py
    • Updated module docstring to reflect TaskEngine usage.
    • Imported TaskMutationError, TaskNotFoundError, and CreateTaskData.
    • Changed TaskController docstring to reference TaskEngine.
    • Refactored list_tasks, get_task, create_task, update_task, transition_task, and delete_task to use app_state.task_engine.
    • Implemented typed error handling for TaskNotFoundError and TaskMutationError in API endpoints.
  • src/ai_company/api/state.py
    • Imported TaskEngine.
    • Added _task_engine to AppState slots.
    • Added task_engine parameter to AppState constructor.
    • Added task_engine property to AppState to retrieve the TaskEngine instance.
    • Added has_task_engine property to check if TaskEngine is configured.
    • Added set_task_engine method for deferred TaskEngine initialization.
  • src/ai_company/config/defaults.py
    • Added task_engine entry to default_config_dict.
  • src/ai_company/config/schema.py
    • Imported TaskEngineConfig.
    • Added task_engine field to RootConfig with TaskEngineConfig as its type.
  • src/ai_company/engine/init.py
    • Updated module docstring to include task engine in re-exports.
    • Exported new TaskEngine related errors: TaskEngineError, TaskEngineNotRunningError, TaskEngineQueueFullError, TaskMutationError, TaskNotFoundError, TaskVersionConflictError.
    • Exported new TaskEngine related models: TaskEngine, TaskEngineConfig, CancelTaskMutation, CreateTaskData, CreateTaskMutation, DeleteTaskMutation, TaskMutation, TaskMutationResult, TaskStateChanged, TransitionTaskMutation, UpdateTaskMutation.
  • src/ai_company/engine/agent_engine.py
    • Imported TaskMutationError and TaskEngine.
    • Defined _TERMINAL_STATUSES constant for task statuses that trigger reporting to TaskEngine.
    • Added task_engine parameter to AgentEngine constructor.
    • Modified _post_execution_pipeline to call _report_to_task_engine.
    • Implemented _report_to_task_engine to report terminal execution statuses to the centralized TaskEngine, handling TaskMutationError and other exceptions gracefully.
  • src/ai_company/engine/errors.py
    • Added TaskEngineError as a base exception for task engine errors.
    • Added TaskEngineNotRunningError for mutations submitted to a stopped engine.
    • Added TaskEngineQueueFullError for when the mutation queue is at capacity.
    • Added TaskMutationError for general task mutation failures.
    • Added TaskNotFoundError for when a task is not found during mutation.
    • Added TaskVersionConflictError for optimistic concurrency mismatches.
  • src/ai_company/engine/task_engine.py
    • Added new file: Implemented the TaskEngine class, a centralized single-writer for task state mutations.
    • Defined _MutationEnvelope dataclass for pairing mutations with futures.
    • Implemented start() and stop() lifecycle methods, including queue draining and cleanup of abandoned futures.
    • Provided submit() method for mutations and convenience methods like create_task(), update_task(), transition_task(), delete_task(), and cancel_task().
    • Implemented read-through methods get_task() and list_tasks() that bypass the queue.
    • Developed _processing_loop() and _process_one() for sequential mutation processing.
    • Included _apply_mutation() dispatch logic for different mutation types.
    • Added _not_found_result() helper for consistent error responses.
    • Implemented _apply_create(), _apply_update(), _apply_transition(), _apply_delete(), and _apply_cancel() for specific mutation logic.
    • Integrated _publish_snapshot() for sending TaskStateChanged events to the message bus.
    • Implemented _bump_version() and _check_version() for optimistic concurrency control.
  • src/ai_company/engine/task_engine_config.py
    • Added new file: Defined TaskEngineConfig Pydantic model for engine configuration, including max_queue_size, drain_timeout_seconds, and publish_snapshots.
  • src/ai_company/engine/task_engine_models.py
    • Added new file: Defined Pydantic models for TaskEngine requests, responses, and events.
    • Introduced CreateTaskData for task creation payload.
    • Defined CreateTaskMutation, UpdateTaskMutation, TransitionTaskMutation, DeleteTaskMutation, and CancelTaskMutation as discriminated union TaskMutation types.
    • Implemented immutable field validation for UpdateTaskMutation and TransitionTaskMutation.
    • Defined TaskMutationResult for mutation outcomes, including success status, task snapshot, version, and error details.
    • Defined TaskStateChanged event for publishing state changes to the message bus.
  • src/ai_company/observability/events/task_engine.py
    • Added new file: Defined event constants for TaskEngine related logging.
  • tests/unit/api/conftest.py
    • Imported Generator.
    • Added fake_task_engine fixture that provides a TaskEngine backed by fake persistence.
    • Updated test_client fixture to accept and pass fake_task_engine to create_app.
  • tests/unit/api/test_app.py
    • Modified test_safe_startup_bus_failure_cleans_up to pass None for task_engine.
    • Modified test_safe_shutdown_persistence_failure_does_not_propagate to pass None for task_engine.
    • Added test_task_engine_failure_cleans_up to verify persistence and bus cleanup if TaskEngine startup fails.
    • Added test_shutdown_task_engine_failure_does_not_propagate to ensure TaskEngine stop failures during shutdown are logged but not propagated.
  • tests/unit/api/test_state.py
    • Added TestAppStateTaskEngine class with tests for task_engine property, has_task_engine, and set_task_engine methods in AppState.
  • tests/unit/engine/test_agent_engine.py
    • Imported TaskMutationError.
    • Added TestReportToTaskEngine class with tests for _report_to_task_engine behavior.
    • Verified no-op when TaskEngine is not configured.
    • Confirmed non-terminal statuses are skipped.
    • Ensured terminal statuses are reported.
    • Tested that TaskMutationError and unexpected exceptions from TaskEngine are logged and swallowed.
  • tests/unit/engine/test_task_engine.py
    • Added new file: Implemented comprehensive unit tests for the TaskEngine class.
    • Included tests for lifecycle (start/stop), submitting to a stopped engine, and various mutation types (create, update, transition, delete, cancel).
    • Covered read-through functionality, version tracking, snapshot publishing, sequential ordering, queue full scenarios, and error propagation.
  • tests/unit/engine/test_task_engine_models.py
    • Added new file: Implemented unit tests for CreateTaskData, CreateTaskMutation, UpdateTaskMutation, TransitionTaskMutation, DeleteTaskMutation, CancelTaskMutation, TaskMutationResult, and TaskStateChanged models.
    • Verified construction, field validation (including immutable fields), and serialization.
  • tests/unit/observability/test_events.py
    • Added task_engine to the list of expected domain modules for event discovery.
Activity
  • The pull request was pre-reviewed by 10 agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, comment-analyzer, type-design-analyzer, logging-audit, resilience-audit, security-reviewer, docs-consistency).
  • 37 findings from the pre-review were consolidated and all implemented.
  • All 6925 tests passed, maintaining 94.39% coverage.
  • Ruff check and format are clean.
  • Mypy strict is clean.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@Aureliolo Aureliolo temporarily deployed to cloudflare-preview March 12, 2026 12:58 — with GitHub Actions Inactive
@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 12, 2026

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
6933 1 6932 6
View the top 1 failed test(s) by shortest run time
tests.unit.config.test_schema.TestRootConfig::test_factory
Stack Traces | 0.004s run time
.../unit/config/test_schema.py:512: in test_factory
    cfg = RootConfigFactory.build()
          ^^^^^^^^^^^^^^^^^^^^^^^^^
.venv/lib/python3.14.../polyfactory/factories/pydantic_factory.py:536: in build
    processed_kwargs = cls.process_kwargs(**kwargs)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.venv/lib/python3.14.../polyfactory/factories/base.py:1097: in process_kwargs
    field_result = cls.get_field_value(
.venv/lib/python3.14.../polyfactory/factories/pydantic_factory.py:509: in get_field_value
    result = super().get_field_value(
.venv/lib/python3.14.../polyfactory/factories/base.py:839: in get_field_value
    return handle_collection_type(
.venv/lib/python3.14.../polyfactory/value_generators/complex_types.py:82: in handle_collection_type
    return container_type(
.venv/lib/python3.14.../polyfactory/value_generators/complex_types.py:83: in <genexpr>
    factory.get_field_value(child, field_build_parameters=field_build_parameters, build_context=build_context)
.venv/lib/python3.14.../polyfactory/factories/pydantic_factory.py:509: in get_field_value
    result = super().get_field_value(
.venv/lib/python3.14.../polyfactory/factories/base.py:791: in get_field_value
    return cls._get_or_create_factory(model=unwrapped_annotation).build(
.venv/lib/python3.14.../polyfactory/factories/pydantic_factory.py:538: in build
    return cls._create_model(kwargs["_build_context"], **processed_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.venv/lib/python3.14.../polyfactory/factories/pydantic_factory.py:572: in _create_model
    return cls.__model__(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
E   pydantic_core._pydantic_core.ValidationError: 1 validation error for EscalationPath
E     Value error, Escalation must be between different departments: '6' == '6' [type=value_error, input_value={'from_department': '6', ...6', 'priority_boost': 1}, input_type=dict]
E       For further information visit https://errors.pydantic.dev/2.12/v/value_error

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented Mar 12, 2026

Greptile Summary

This PR introduces the centralized TaskEngine — a single-writer actor that serializes all task state mutations through an asyncio.Queue, eliminating race conditions from concurrent agent writes. It also wires the engine into the API lifecycle (app.py, AppState, TaskController) and adds a _report_to_task_engine post-execution hook to AgentEngine. The implementation is thorough: typed error hierarchy, immutable-field validators, optimistic concurrency, snapshot publishing, structured logging, and comprehensive test coverage.

Two logic issues found:

  • In-flight future left unresolved on drain-timeout cancellation (task_engine.py): When stop() cancels the processing task, asyncio.CancelledError (BaseException, not Exception) bypasses both except Exception handlers in _process_one and _processing_loop. The mutation currently being processed has its future silently abandoned. _fail_remaining_futures() only covers items still in the queue. Any caller awaiting that future will hang.
  • created_at missing from immutable field sets (task_engine_models.py): The PR description explicitly lists created_at as a protected field, but neither _IMMUTABLE_TASK_FIELDS nor _IMMUTABLE_OVERRIDE_FIELDS includes it, allowing clients to overwrite task creation timestamps via UpdateTaskMutation.

Two style notes:

  • Cleanup failure log event: _cleanup_on_failure in app.py emits API_APP_STARTUP for bridge and task engine teardown errors, blending cleanup failures into startup-related log queries.
  • In-memory version tracking: _versions does not survive process restarts; callers caching version numbers across restarts will see spurious TaskVersionConflictError. Worth documenting explicitly.

Confidence Score: 3/5

  • Safe to merge after addressing the unresolved-future and missing-created_at issues; rest of the implementation is solid.
  • Two logic issues lower confidence: (1) abandoned futures on drain-timeout cancellation could cause hangs in production shutdown edge cases, and (2) created_at is not protected by the immutable-field validators despite the PR description stating it should be. The broader architecture is well-designed, tests are comprehensive, and all other code quality signals (mypy strict, ruff, 94% coverage) are clean.
  • src/ai_company/engine/task_engine.py (cancellation handling in _processing_loop) and src/ai_company/engine/task_engine_models.py (immutable field sets)

Important Files Changed

Filename Overview
src/ai_company/engine/task_engine.py New 907-line centralized single-writer task engine; two issues found: in-flight futures left unresolved on drain-timeout cancellation, and in-memory version tracking lost on restart.
src/ai_company/engine/task_engine_models.py New mutation/result/event models with immutable-field validators; created_at is absent from both _IMMUTABLE_TASK_FIELDS and _IMMUTABLE_OVERRIDE_FIELDS despite being listed in the PR description as a protected field.
src/ai_company/api/app.py Lifecycle management extended for task engine startup/shutdown/cleanup; cleanup failure paths incorrectly log under API_APP_STARTUP event, which will pollute startup-related log queries.
src/ai_company/api/controllers/tasks.py Controller migrated cleanly from direct persistence calls to TaskEngine typed convenience methods with comprehensive error mapping to API error types.
src/ai_company/engine/errors.py Well-structured typed error hierarchy added: TaskEngineError → TaskMutationError → {TaskNotFoundError, TaskVersionConflictError, TaskInternalError}; TaskEngineNotRunningError and TaskEngineQueueFullError complete the set.
src/ai_company/engine/agent_engine.py Clean integration of _report_to_task_engine as a best-effort post-execution hook; _REPORTABLE_STATUSES constant extracted and TaskMutationError vs generic Exception split is well reasoned.
src/ai_company/engine/task_engine_config.py Frozen Pydantic config model with sensible defaults (queue=1000, drain=10s); max_queue_size=0 correctly maps to unbounded asyncio.Queue.
src/ai_company/observability/events/task_engine.py New event constants module; all key lifecycle, mutation, snapshot, versioning, and error events are covered and match actual usage in task_engine.py.
tests/unit/engine/test_task_engine_integration.py Comprehensive integration tests covering snapshot publishing, concurrent ordering, queue backpressure, version tracking, drain-on-stop; a test for the cancellation / future-hang scenario is missing.

Sequence Diagram

sequenceDiagram
    participant Caller as Agent / API Controller
    participant TE as TaskEngine
    participant Q as asyncio.Queue
    participant PL as _processing_loop
    participant P as PersistenceBackend
    participant MB as MessageBus

    Caller->>TE: submit(mutation)
    TE->>TE: check _running flag
    TE->>Q: put_nowait(MutationEnvelope)
    TE-->>Caller: await envelope.future

    PL->>Q: wait_for(get(), timeout=0.5)
    Q-->>PL: MutationEnvelope
    PL->>PL: _process_one(envelope)
    PL->>P: save / get / delete
    P-->>PL: result
    PL->>PL: envelope.future.set_result(TaskMutationResult)
    PL-->>Caller: TaskMutationResult (future resolved)

    opt publish_snapshots=True
        PL->>MB: publish(TaskStateChanged)
    end

    note over TE,PL: stop(): set _running=False, drain queue with timeout, then cancel task
Loading

Comments Outside Diff (5)

  1. src/ai_company/engine/task_engine.py, line 1459-1472 (link)

    In-flight mutation future left unresolved on drain timeout

    When stop() exhausts its drain timeout and cancels _processing_task, any mutation that is currently being awaited inside _process_one will be interrupted with asyncio.CancelledError. Because CancelledError is a BaseException, it is not caught by the except Exception handlers in either _process_one (lines 1488–1504) or _processing_loop (lines 1459–1472). The corresponding envelope.future is therefore never resolved.

    _fail_remaining_futures() only drains the queue — items that have already been dequeued for processing are not covered. Any caller awaiting that future will block indefinitely.

    A simple fix is to also handle BaseException (or at least CancelledError) in _process_one and set a failure result before re-raising:

            try:
                await self._process_one(envelope)
            except BaseException as exc:
                if not envelope.future.done():
                    envelope.future.set_result(
                        TaskMutationResult(
                            request_id=envelope.mutation.request_id,
                            success=False,
                            error="Processing interrupted",
                            error_code="internal",
                        ),
                    )
                raise
            except Exception:
                logger.exception(
                    TASK_ENGINE_LOOP_ERROR,
                    error="Unhandled exception in processing loop",
                )
                if not envelope.future.done():
                    envelope.future.set_result(
                        TaskMutationResult(
                            request_id=envelope.mutation.request_id,
                            success=False,
                            error="Internal error in processing loop",
                            error_code="internal",
                        ),
                    )
  2. src/ai_company/engine/task_engine_models.py, line 2006-2018 (link)

    created_at missing from immutable fields

    The PR description explicitly lists created_at as a field that UpdateTaskMutation and TransitionTaskMutation validators should reject — "reject writes to id, status, created_by, created_at, etc." — but created_at does not appear in either _IMMUTABLE_TASK_FIELDS or _IMMUTABLE_OVERRIDE_FIELDS.

    Because _apply_update merges raw mutation.updates over a task.model_dump() before calling Task.model_validate(merged), an update that passes {"created_at": ...} would silently overwrite the original creation timestamp, corrupting audit history.

  3. src/ai_company/engine/task_engine_models.py, line 2055-2062 (link)

    created_at also missing from transition override immutable fields

    Same gap as above — _IMMUTABLE_OVERRIDE_FIELDS does not include created_at, so TransitionTaskMutation.overrides can overwrite the task's creation timestamp.

  4. src/ai_company/api/app.py, line 183-194 (link)

    Cleanup failure logs use API_APP_STARTUP event

    Both the task engine and bridge cleanup failure paths in _cleanup_on_failure log exceptions under the API_APP_STARTUP event constant. This will make cleanup errors appear indistinguishable from normal startup events in log aggregation and dashboards. A more specific event constant (e.g. API_APP_STARTUP_CLEANUP_FAILED) would make these errors easier to triage. The same misleading event is used a few lines later for the bridge cleanup at line 191.

  5. src/ai_company/engine/task_engine.py, line 1075-1077 (link)

    In-memory version tracking is lost on process restart

    _versions is an in-memory dict[str, int] that is not persisted. After a server restart, all tasks that already exist in the persistence backend will have an implicit version of 0 from the engine's perspective. Any caller that stored a version number across the restart (e.g. from a previous API response) and later passes expected_version=N for N > 0 will receive a spurious TaskVersionConflictError. Conversely, a caller that passes expected_version=0 (or omits it) will always succeed, even for stale writes.

    Consider persisting the version counter alongside the task record (e.g. as a field on the Task model), or documenting clearly that optimistic concurrency guarantees do not survive restarts so callers know not to cache version numbers across process boundaries.

Prompt To Fix All With AI
This is a comment left during a code review.
Path: src/ai_company/engine/task_engine.py
Line: 1459-1472

Comment:
**In-flight mutation future left unresolved on drain timeout**

When `stop()` exhausts its drain timeout and cancels `_processing_task`, any mutation that is currently being awaited inside `_process_one` will be interrupted with `asyncio.CancelledError`. Because `CancelledError` is a `BaseException`, it is **not** caught by the `except Exception` handlers in either `_process_one` (lines 1488–1504) or `_processing_loop` (lines 1459–1472). The corresponding `envelope.future` is therefore never resolved.

`_fail_remaining_futures()` only drains the **queue** — items that have already been dequeued for processing are not covered. Any caller awaiting that future will block indefinitely.

A simple fix is to also handle `BaseException` (or at least `CancelledError`) in `_process_one` and set a failure result before re-raising:

```python
        try:
            await self._process_one(envelope)
        except BaseException as exc:
            if not envelope.future.done():
                envelope.future.set_result(
                    TaskMutationResult(
                        request_id=envelope.mutation.request_id,
                        success=False,
                        error="Processing interrupted",
                        error_code="internal",
                    ),
                )
            raise
        except Exception:
            logger.exception(
                TASK_ENGINE_LOOP_ERROR,
                error="Unhandled exception in processing loop",
            )
            if not envelope.future.done():
                envelope.future.set_result(
                    TaskMutationResult(
                        request_id=envelope.mutation.request_id,
                        success=False,
                        error="Internal error in processing loop",
                        error_code="internal",
                    ),
                )
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: src/ai_company/engine/task_engine_models.py
Line: 2006-2018

Comment:
**`created_at` missing from immutable fields**

The PR description explicitly lists `created_at` as a field that `UpdateTaskMutation` and `TransitionTaskMutation` validators should reject — _"reject writes to `id`, `status`, `created_by`, `created_at`, etc."_ — but `created_at` does not appear in either `_IMMUTABLE_TASK_FIELDS` or `_IMMUTABLE_OVERRIDE_FIELDS`.

Because `_apply_update` merges raw `mutation.updates` over a `task.model_dump()` before calling `Task.model_validate(merged)`, an update that passes `{"created_at": ...}` would silently overwrite the original creation timestamp, corrupting audit history.

```suggestion
_IMMUTABLE_TASK_FIELDS: frozenset[str] = frozenset(
    {
        "id",
        "status",
        "created_by",
        "created_at",
    }
)
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: src/ai_company/engine/task_engine_models.py
Line: 2055-2062

Comment:
**`created_at` also missing from transition override immutable fields**

Same gap as above — `_IMMUTABLE_OVERRIDE_FIELDS` does not include `created_at`, so `TransitionTaskMutation.overrides` can overwrite the task's creation timestamp.

```suggestion
_IMMUTABLE_OVERRIDE_FIELDS: frozenset[str] = frozenset(
    {
        "id",
        "created_by",
        "created_at",
        "status",
    }
)
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: src/ai_company/api/app.py
Line: 183-194

Comment:
**Cleanup failure logs use `API_APP_STARTUP` event**

Both the task engine and bridge cleanup failure paths in `_cleanup_on_failure` log exceptions under the `API_APP_STARTUP` event constant. This will make cleanup errors appear indistinguishable from normal startup events in log aggregation and dashboards. A more specific event constant (e.g. `API_APP_STARTUP_CLEANUP_FAILED`) would make these errors easier to triage. The same misleading event is used a few lines later for the bridge cleanup at line 191.

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: src/ai_company/engine/task_engine.py
Line: 1075-1077

Comment:
**In-memory version tracking is lost on process restart**

`_versions` is an in-memory `dict[str, int]` that is not persisted. After a server restart, all tasks that already exist in the persistence backend will have an implicit version of `0` from the engine's perspective. Any caller that stored a version number across the restart (e.g. from a previous API response) and later passes `expected_version=N` for N > 0 will receive a spurious `TaskVersionConflictError`. Conversely, a caller that passes `expected_version=0` (or omits it) will always succeed, even for stale writes.

Consider persisting the version counter alongside the task record (e.g. as a field on the `Task` model), or documenting clearly that optimistic concurrency guarantees do not survive restarts so callers know not to cache version numbers across process boundaries.

How can I resolve this? If you propose a fix, please make it concise.

Last reviewed commit: 3254142

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a centralized single-writer TaskEngine (queue-based) for task mutations, integrates it into the API and AgentEngine, and expands typed error handling, observability events, and unit tests to support the new coordination model.

Changes:

  • Add TaskEngine core implementation, config + Pydantic request/result/event models, and task-engine observability event constants.
  • Wire TaskEngine into API lifecycle (create_app startup/shutdown), AppState, and task CRUD/transition controller paths.
  • Expand unit tests for TaskEngine behavior, model validation/immutability, API lifecycle cleanup, and AgentEngine terminal-status reporting.

Reviewed changes

Copilot reviewed 22 out of 22 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
tests/unit/observability/test_events.py Adds task_engine to domain module discovery coverage.
tests/unit/engine/test_task_engine_models.py New tests for TaskEngine request/result/event model validation and immutability.
tests/unit/engine/test_task_engine.py New end-to-end unit tests for TaskEngine lifecycle, mutations, versions, queue backpressure, snapshots.
tests/unit/engine/test_agent_engine.py Adds tests for AgentEngine best-effort reporting to TaskEngine on terminal statuses.
tests/unit/api/test_state.py Adds tests for AppState.task_engine accessors and configuration behavior.
tests/unit/api/test_app.py Updates lifecycle helper signatures; adds startup/shutdown tests covering TaskEngine failure handling.
tests/unit/api/conftest.py Adds a TaskEngine fixture and passes it into create_app; converts test_client to a yielding fixture.
src/ai_company/observability/events/task_engine.py Introduces TaskEngine event name constants.
src/ai_company/engine/task_engine_models.py Adds mutation request models, mutation result model, and snapshot event model (+ validators).
src/ai_company/engine/task_engine_config.py Adds TaskEngine configuration model (queue sizing, drain timeout, snapshot publishing).
src/ai_company/engine/task_engine.py Implements the TaskEngine queue/loop, mutation application, version tracking, and snapshot publishing.
src/ai_company/engine/errors.py Adds TaskEngine-specific typed exceptions.
src/ai_company/engine/agent_engine.py Adds _report_to_task_engine() and terminal-status gating for best-effort final-state reporting.
src/ai_company/engine/init.py Re-exports TaskEngine API surface (engine, models, config, errors).
src/ai_company/config/schema.py Adds task_engine: TaskEngineConfig to root configuration.
src/ai_company/config/defaults.py Adds task_engine defaults section to generated config dict.
src/ai_company/api/state.py Adds TaskEngine storage/accessors to AppState.
src/ai_company/api/controllers/tasks.py Switches task CRUD/transition endpoints to use AppState.task_engine instead of direct repository writes.
src/ai_company/api/app.py Adds TaskEngine to lifecycle wiring with startup/shutdown + failure cleanup handling.
docs/design/engine.md Documents TaskEngine architecture and behavior in the engine design doc.
docs/architecture/tech-stack.md Marks “State coordination” as Adopted and references TaskEngine.
CLAUDE.md Updates engine package description and logging-event guidance to include TaskEngine.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/ai_company/engine/task_engine.py Outdated
)
try:
result = await self._apply_mutation(mutation)
envelope.future.set_result(result)
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

envelope.future.set_result(result) is unconditional. If the submitter task gets cancelled (e.g., via asyncio.wait_for(...) timeout), cancellation is typically propagated to the awaited Future, and set_result() will raise InvalidStateError. That exception will be treated as a mutation failure, and will also skip snapshot publication even though _apply_mutation() already succeeded and persisted state.

Consider guarding set_result() with if not envelope.future.done(): ... (and handling InvalidStateError separately) so caller cancellation can’t cause false failure logging or suppress snapshot publication for a successfully-applied mutation.

Suggested change
envelope.future.set_result(result)
try:
if not envelope.future.done():
envelope.future.set_result(result)
except asyncio.InvalidStateError:
# Future was already completed or cancelled (e.g., caller timeout);
# ignore to avoid treating this as a mutation failure.
pass

Copilot uses AI. Check for mistakes.
Comment thread src/ai_company/api/controllers/tasks.py Outdated
Comment on lines +201 to +206
task = await app_state.task_engine.transition_task(
task_id,
data.target_status,
requested_by="api",
reason=f"API transition to {data.target_status.value}",
assigned_to=data.assigned_to,
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assigned_to=data.assigned_to is always passed into TaskEngine.transition_task(). When assigned_to is None (omitted in the request), this will override/clear any existing assignee and can cause otherwise-valid transitions (e.g., ASSIGNED → IN_PROGRESS) to fail Task validation because IN_PROGRESS requires assigned_to.

Consider only passing assigned_to when it is not None (build an overrides dict and splat it), or have TaskEngine.transition_task() drop None overrides for optional fields.

Suggested change
task = await app_state.task_engine.transition_task(
task_id,
data.target_status,
requested_by="api",
reason=f"API transition to {data.target_status.value}",
assigned_to=data.assigned_to,
transition_kwargs: dict[str, object] = {
"requested_by": "api",
"reason": f"API transition to {data.target_status.value}",
}
if data.assigned_to is not None:
transition_kwargs["assigned_to"] = data.assigned_to
task = await app_state.task_engine.transition_task(
task_id,
data.target_status,
**transition_kwargs,

Copilot uses AI. Check for mistakes.
Comment thread src/ai_company/engine/task_engine.py Outdated
previous_status=task.status,
)

updated = task.model_copy(update=mutation.updates)
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task.with_transition() validates invariants via Task.model_validate(...), but the update path uses task.model_copy(update=mutation.updates), which does not run validators. This can persist invalid Task states (e.g., setting assigned_to while leaving status=CREATED, or clearing required fields) and may break later transitions/reads.

Consider rebuilding and re-validating the updated task (e.g., validate a merged payload) before persisting, so Task validators always run for updates.

Suggested change
updated = task.model_copy(update=mutation.updates)
merged_data = task.model_dump()
merged_data.update(mutation.updates)
updated = Task.model_validate(merged_data)

Copilot uses AI. Check for mistakes.
Comment thread src/ai_company/engine/task_engine.py Outdated
Comment on lines +418 to +424
@staticmethod
def _raise_typed_error(result: TaskMutationResult) -> None:
"""Raise a typed error from a failed mutation result."""
error = result.error or "Mutation failed"
if "not found" in error:
raise TaskNotFoundError(error)
raise TaskMutationError(error)
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_raise_typed_error() currently only maps "not found" errors to TaskNotFoundError and otherwise raises TaskMutationError. Version conflicts are converted into a failed TaskMutationResult earlier, but never raised as TaskVersionConflictError, which undermines the typed error hierarchy described in the PR (and used for controller handling).

Consider explicitly detecting version-conflict failures and raising TaskVersionConflictError (or propagating the typed exception instead of string-matching).

Copilot uses AI. Check for mistakes.
Comment thread docs/design/engine.md Outdated
Comment on lines +189 to +190
Callers can pass `expected_version` to detect stale writes;
`TaskVersionConflictError` is raised on mismatch.
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs say TaskVersionConflictError is raised on expected_version mismatch, but TaskEngine currently catches TaskVersionConflictError inside _apply_update/_apply_transition and returns a failed TaskMutationResult instead, and the convenience methods don’t raise TaskVersionConflictError either.

Either update the implementation to actually raise a typed TaskVersionConflictError to callers (at least in convenience methods), or adjust this doc section to match the current result-based API.

Suggested change
Callers can pass `expected_version` to detect stale writes;
`TaskVersionConflictError` is raised on mismatch.
Callers can pass `expected_version` to detect stale writes; on mismatch, the
engine returns a failed `TaskMutationResult` indicating a version conflict
instead of raising an exception.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/design/engine.md`:
- Around line 177-182: The fenced ASCII diagram block in docs/design/engine.md
is missing a language specifier (MD040); update the fenced code block that
contains "Agent / API  ──submit()──▶  asyncio.Queue  ──▶  _processing_loop  ──▶ 
Persistence" to use a plaintext specifier such as ```text or ```plaintext so the
diagram is treated as plain text (e.g., change the opening fence to ```text).

In `@src/ai_company/api/controllers/tasks.py`:
- Around line 215-222: The logger call in the TaskMutationError except block
incorrectly uses the TASK_STATUS_CHANGED event constant; update this to a
semantically correct event (either create/use API_TASK_TRANSITION_FAILED in
ai_company.observability.events.api or use an appropriate general API error
event constant) and keep the same payload (task_id and error_str); change the
logger.warning invocation in the except block that catches TaskMutationError
(where ApiValidationError is raised) to reference the new constant so the
failure path is logged with the correct event name.

In `@src/ai_company/engine/agent_engine.py`:
- Around line 102-110: The name and docstring for _TERMINAL_STATUSES are
misleading because it includes TaskStatus values (FAILED, INTERRUPTED) that the
TaskStatus enum marks as reassignable; update either the identifier or the
docstring: rename _TERMINAL_STATUSES to a clearer name like
_REPORTABLE_FINAL_STATUSES (and update all references) or change the docstring
near _TERMINAL_STATUSES to explain these are "final outcomes reported to
TaskEngine by AgentEngine" rather than true lifecycle-terminal states; ensure
references to TaskStatus (FAILED, INTERRUPTED, COMPLETED, CANCELLED) and any
uses in AgentEngine reporting logic are updated to reflect the new
name/description.

In `@src/ai_company/engine/task_engine_models.py`:
- Around line 83-94: _UPDATE: _IMMUTABLE_TASK_FIELDS currently lists timestamp
fields that don't exist on the Task model; update the frozenset in
_IMMUTABLE_TASK_FIELDS to only include the actual Task fields "id", "status",
and "created_by", removing "created_at", "updated_at", "started_at", and
"completed_at" (those belong to TaskExecution or are absent entirely) so the
immutable set matches the Task model.

In `@src/ai_company/engine/task_engine.py`:
- Around line 418-424: The current _raise_typed_error(result:
TaskMutationResult) uses brittle string matching on result.error; add a stable
error classification field (e.g., result.code or result.error_code of an
enum/enum-like string) to TaskMutationResult and update _raise_typed_error to
switch on that code to raise TaskNotFoundError, TaskMutationError, etc.; keep a
backward-compatible fallback to the existing "not found" substring check if code
is absent to avoid breaking callers.
- Around line 730-777: The cancel path (_apply_cancel) currently skips
optimistic concurrency checks because CancelTaskMutation and the public
cancel_task() API lack an expected_version; to make cancellations version-aware,
add an expected_version field to CancelTaskMutation and accept it on
cancel_task(), then in _apply_cancel fetch the current task version from
self._persistence.tasks.get, compare it to mutation.expected_version, and if
they differ return a conflict/failed TaskMutationResult (mirroring the behavior
in _apply_update/_apply_transition); only proceed to task.with_transition, save,
bump version (self._bump_version), and log the applied mutation when the
versions match.

In `@tests/unit/engine/test_task_engine.py`:
- Line 183: Remove the spurious type ignore on the await eng.stop(timeout=2.0)
call: the code path is reachable after assert eng.is_running is True and start()
succeeds, so delete the "# type: ignore[unreachable]" comment next to the
eng.stop invocation (referencing the await eng.stop(timeout=2.0) line and the
surrounding assert eng.is_running and start() usage) to avoid misleading future
readers.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 60fccc86-2283-4d3e-be11-6b51657d9441

📥 Commits

Reviewing files that changed from the base of the PR and between d1203e5 and ef4e7df.

📒 Files selected for processing (22)
  • CLAUDE.md
  • docs/architecture/tech-stack.md
  • docs/design/engine.md
  • src/ai_company/api/app.py
  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/api/state.py
  • src/ai_company/config/defaults.py
  • src/ai_company/config/schema.py
  • src/ai_company/engine/__init__.py
  • src/ai_company/engine/agent_engine.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/engine/task_engine_config.py
  • src/ai_company/engine/task_engine_models.py
  • src/ai_company/observability/events/task_engine.py
  • tests/unit/api/conftest.py
  • tests/unit/api/test_app.py
  • tests/unit/api/test_state.py
  • tests/unit/engine/test_agent_engine.py
  • tests/unit/engine/test_task_engine.py
  • tests/unit/engine/test_task_engine_models.py
  • tests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Agent
  • GitHub Check: Greptile Review
  • GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: No from __future__ import annotations — Python 3.14 has PEP 649
Use except A, B: syntax (no parentheses) for exception handling — PEP 758 syntax enforced by ruff on Python 3.14
Add type hints to all public functions — enforce mypy strict mode
Use Google-style docstrings — required on public classes and functions, enforced by ruff D rules
For frozen Pydantic models with dict/list fields, use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Use Pydantic v2 BaseModel, model_validator, computed_field, and ConfigDict — avoid redundant stored fields with @computed_field
Use NotBlankStr (from core.types) for all identifier/name fields, including optional (NotBlankStr | None) and tuple variants, instead of manual whitespace validators
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code instead of bare create_task
Keep functions under 50 lines and files under 800 lines
Handle errors explicitly — never silently swallow exceptions
Enforce line length of 88 characters via ruff
Config values should use frozen Pydantic models for immutable config/identity; separate mutable-via-copy models (using model_copy(update=...)) for runtime state that evolves
Never mix static config fields with mutable runtime fields in one model

Files:

  • src/ai_company/config/schema.py
  • src/ai_company/config/defaults.py
  • src/ai_company/engine/errors.py
  • tests/unit/engine/test_task_engine.py
  • tests/unit/api/test_state.py
  • tests/unit/engine/test_agent_engine.py
  • src/ai_company/engine/agent_engine.py
  • src/ai_company/engine/__init__.py
  • tests/unit/observability/test_events.py
  • src/ai_company/api/state.py
  • src/ai_company/api/app.py
  • src/ai_company/observability/events/task_engine.py
  • src/ai_company/engine/task_engine.py
  • tests/unit/api/conftest.py
  • src/ai_company/engine/task_engine_models.py
  • src/ai_company/api/controllers/tasks.py
  • tests/unit/api/test_app.py
  • src/ai_company/engine/task_engine_config.py
  • tests/unit/engine/test_task_engine_models.py
src/ai_company/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/ai_company/**/*.py: Every module with business logic MUST have: from ai_company.observability import get_logger then logger = get_logger(__name__)
Never use import logging, logging.getLogger(), or print() in application code — use the observability logger instead
Always use logger as the variable name (not _logger or log)
Use event name constants from domain-specific modules under ai_company.observability.events (e.g., PROVIDER_CALL_START from events.provider, BUDGET_RECORD_ADDED from events.budget) — import directly
Use structured logging with kwargs: logger.info(EVENT, key=value) — never use string formatting like logger.info("msg %s", val)
Log all error paths at WARNING or ERROR with context before raising
Log all state transitions at INFO level
Use DEBUG level for object creation, internal flow, and entry/exit of key functions

Files:

  • src/ai_company/config/schema.py
  • src/ai_company/config/defaults.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/agent_engine.py
  • src/ai_company/engine/__init__.py
  • src/ai_company/api/state.py
  • src/ai_company/api/app.py
  • src/ai_company/observability/events/task_engine.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/engine/task_engine_models.py
  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/engine/task_engine_config.py
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names: example-provider, example-large-001, example-medium-001, example-small-001, large/medium/small as aliases. Tests must use test-provider, test-small-001, etc.

Files:

  • src/ai_company/config/schema.py
  • src/ai_company/config/defaults.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/agent_engine.py
  • src/ai_company/engine/__init__.py
  • src/ai_company/api/state.py
  • src/ai_company/api/app.py
  • src/ai_company/observability/events/task_engine.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/engine/task_engine_models.py
  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/engine/task_engine_config.py
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Use @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, and @pytest.mark.slow markers for test organization
Prefer @pytest.mark.parametrize for testing similar cases
Never use real vendor names in test files — use test-provider, test-small-001, and generic model names

Files:

  • tests/unit/engine/test_task_engine.py
  • tests/unit/api/test_state.py
  • tests/unit/engine/test_agent_engine.py
  • tests/unit/observability/test_events.py
  • tests/unit/api/conftest.py
  • tests/unit/api/test_app.py
  • tests/unit/engine/test_task_engine_models.py
🧠 Learnings (10)
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/providers/**/*.py : Set `RetryConfig` and `RateLimiterConfig` per-provider in `ProviderConfig`

Applied to files:

  • src/ai_company/config/schema.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Every module with business logic MUST have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)`

Applied to files:

  • CLAUDE.md
  • src/ai_company/api/state.py
  • src/ai_company/api/app.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Never use `import logging`, `logging.getLogger()`, or `print()` in application code — use the observability logger instead

Applied to files:

  • CLAUDE.md
  • src/ai_company/api/state.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Use event name constants from domain-specific modules under `ai_company.observability.events` (e.g., `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`) — import directly

Applied to files:

  • CLAUDE.md
  • tests/unit/observability/test_events.py
  • src/ai_company/api/state.py
  • src/ai_company/api/app.py
  • src/ai_company/observability/events/task_engine.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Always use `logger` as the variable name (not `_logger` or `log`)

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Use structured logging with kwargs: `logger.info(EVENT, key=value)` — never use string formatting like `logger.info("msg %s", val)`

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Log all error paths at WARNING or ERROR with context before raising

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Log all state transitions at INFO level

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Use DEBUG level for object creation, internal flow, and entry/exit of key functions

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to **/*.py : Config values should use frozen Pydantic models for immutable config/identity; separate mutable-via-copy models (using `model_copy(update=...)`) for runtime state that evolves

Applied to files:

  • src/ai_company/engine/task_engine_config.py
🧬 Code graph analysis (12)
src/ai_company/config/schema.py (3)
tests/unit/engine/test_task_engine.py (1)
  • engine (137-148)
src/ai_company/engine/task_engine_config.py (1)
  • TaskEngineConfig (6-37)
src/ai_company/api/state.py (1)
  • task_engine (107-109)
tests/unit/engine/test_task_engine.py (4)
src/ai_company/engine/errors.py (4)
  • TaskEngineNotRunningError (89-90)
  • TaskEngineQueueFullError (93-94)
  • TaskMutationError (97-98)
  • TaskNotFoundError (101-102)
src/ai_company/engine/task_engine.py (4)
  • TaskEngine (80-871)
  • stop (138-175)
  • submit (198-233)
  • _MutationEnvelope (67-77)
src/ai_company/engine/task_engine_config.py (1)
  • TaskEngineConfig (6-37)
src/ai_company/engine/parallel_models.py (1)
  • task_id (87-89)
tests/unit/api/test_state.py (3)
src/ai_company/api/state.py (3)
  • task_engine (107-109)
  • has_task_engine (112-114)
  • set_task_engine (116-131)
src/ai_company/api/errors.py (1)
  • ServiceUnavailableError (68-74)
tests/unit/engine/test_task_engine.py (1)
  • engine (137-148)
tests/unit/engine/test_agent_engine.py (1)
src/ai_company/engine/errors.py (1)
  • TaskMutationError (97-98)
src/ai_company/engine/agent_engine.py (4)
src/ai_company/engine/errors.py (1)
  • TaskMutationError (97-98)
src/ai_company/api/state.py (1)
  • task_engine (107-109)
src/ai_company/engine/task_engine.py (2)
  • TaskEngine (80-871)
  • transition_task (306-350)
src/ai_company/core/enums.py (1)
  • TaskStatus (198-224)
src/ai_company/api/state.py (2)
tests/unit/engine/test_task_engine.py (1)
  • engine (137-148)
src/ai_company/engine/task_engine.py (1)
  • TaskEngine (80-871)
src/ai_company/api/app.py (2)
src/ai_company/api/state.py (3)
  • task_engine (107-109)
  • persistence (87-89)
  • AppState (22-153)
src/ai_company/engine/task_engine.py (3)
  • TaskEngine (80-871)
  • stop (138-175)
  • start (118-136)
src/ai_company/engine/task_engine.py (5)
src/ai_company/core/enums.py (1)
  • TaskStatus (198-224)
src/ai_company/engine/errors.py (5)
  • TaskEngineNotRunningError (89-90)
  • TaskEngineQueueFullError (93-94)
  • TaskMutationError (97-98)
  • TaskNotFoundError (101-102)
  • TaskVersionConflictError (105-106)
src/ai_company/engine/task_engine_config.py (1)
  • TaskEngineConfig (6-37)
src/ai_company/engine/task_engine_models.py (8)
  • CancelTaskMutation (201-218)
  • CreateTaskData (20-59)
  • CreateTaskMutation (65-80)
  • DeleteTaskMutation (183-198)
  • TaskMutationResult (234-267)
  • TaskStateChanged (273-310)
  • TransitionTaskMutation (142-180)
  • UpdateTaskMutation (97-128)
src/ai_company/persistence/protocol.py (1)
  • PersistenceBackend (27-167)
src/ai_company/engine/task_engine_models.py (2)
src/ai_company/core/enums.py (4)
  • Complexity (247-253)
  • Priority (238-244)
  • TaskStatus (198-224)
  • TaskType (227-235)
src/ai_company/core/task.py (1)
  • Task (45-261)
src/ai_company/api/controllers/tasks.py (5)
src/ai_company/core/task.py (1)
  • Task (45-261)
src/ai_company/engine/errors.py (2)
  • TaskMutationError (97-98)
  • TaskNotFoundError (101-102)
src/ai_company/engine/task_engine_models.py (1)
  • CreateTaskData (20-59)
src/ai_company/api/state.py (1)
  • task_engine (107-109)
src/ai_company/engine/task_engine.py (4)
  • create_task (235-265)
  • update_task (267-304)
  • transition_task (306-350)
  • delete_task (352-380)
tests/unit/api/test_app.py (2)
src/ai_company/api/app.py (2)
  • _safe_startup (213-281)
  • _safe_shutdown (284-322)
src/ai_company/api/state.py (2)
  • persistence (87-89)
  • AppState (22-153)
tests/unit/engine/test_task_engine_models.py (2)
src/ai_company/core/enums.py (4)
  • Complexity (247-253)
  • Priority (238-244)
  • TaskStatus (198-224)
  • TaskType (227-235)
src/ai_company/engine/task_engine_models.py (7)
  • CreateTaskData (20-59)
  • CreateTaskMutation (65-80)
  • DeleteTaskMutation (183-198)
  • TaskMutationResult (234-267)
  • TaskStateChanged (273-310)
  • TransitionTaskMutation (142-180)
  • UpdateTaskMutation (97-128)
🪛 LanguageTool
CLAUDE.md

[style] ~130-~130: A comma is missing here.
Context: ...nder ai_company.observability.events (e.g. PROVIDER_CALL_START from `events.prov...

(EG_NO_COMMA)

docs/architecture/tech-stack.md

[style] ~122-~122: Consider using the typographical ellipsis character here instead.
Context: ... Agents submit requests; engine applies model_copy(update=...) sequentially and publishes snapshots....

(ELLIPSIS)

docs/design/engine.md

[style] ~186-~186: Consider using the typographical ellipsis character here instead.
Context: ...mmutable updates**: Each mutation calls model_copy(update=...) on frozen Task models — the origi...

(ELLIPSIS)

🪛 markdownlint-cli2 (0.21.0)
docs/design/engine.md

[warning] 177-177: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🔇 Additional comments (47)
src/ai_company/config/defaults.py (1)

43-43: LGTM!

The empty dict default for task_engine is appropriate since TaskEngineConfig in schema.py uses default_factory=TaskEngineConfig, which provides sensible defaults for all fields.

docs/design/engine.md (1)

169-221: LGTM!

The TaskEngine architecture documentation is comprehensive, accurately describes the single-writer pattern, mutation types, error handling semantics, and lifecycle management. The content aligns well with the implementation in task_engine.py.

src/ai_company/engine/errors.py (1)

83-106: LGTM!

The exception hierarchy is well-designed:

  • TaskEngineError as base extends EngineError for consistency with existing engine errors
  • Operational errors (TaskEngineNotRunningError, TaskEngineQueueFullError) are separate from mutation errors
  • TaskMutationError with specific subclasses (TaskNotFoundError, TaskVersionConflictError) enables precise error handling in API controllers
src/ai_company/engine/task_engine_config.py (1)

1-37: LGTM!

The TaskEngineConfig model follows best practices:

  • Frozen Pydantic model for immutable configuration
  • Sensible field constraints (ge=0 for unbounded queue, gt=0, le=300 for timeout)
  • Clear docstring documenting field semantics including the 0 = unbounded convention
  • allow_inf_nan=False prevents invalid float values

Based on learnings: "Config values should use frozen Pydantic models for immutable config/identity."

src/ai_company/config/schema.py (2)

24-24: LGTM!

Import follows the established pattern for config model imports.


526-529: LGTM!

The task_engine field integration follows the established pattern used by other config sections (e.g., graceful_shutdown, task_assignment):

  • Uses default_factory=TaskEngineConfig for proper instantiation
  • Includes description for documentation
  • Docstring updated at line 418
src/ai_company/observability/events/task_engine.py (1)

1-19: LGTM!

The event constants module is well-designed:

  • Follows established naming conventions (TASK_ENGINE_* with dotted string values)
  • Comprehensive coverage of all TaskEngine observability points: lifecycle, mutations, snapshots, drain, and errors
  • Uses Final[str] typing consistent with other event modules

Based on learnings: "Use event name constants from domain-specific modules under ai_company.observability.events."

src/ai_company/engine/task_engine.py (5)

66-77: LGTM!

The _MutationEnvelope dataclass correctly pairs mutations with their response futures. The docstring appropriately documents the requirement that instantiation must occur within a running event loop.


118-189: LGTM!

The lifecycle methods are well-implemented:

  • start() prevents double-start with proper error handling
  • stop() implements graceful drain with configurable timeout, proper cancellation handling, and cleanup of remaining futures
  • _fail_remaining_futures() correctly handles edge cases with QueueEmpty suppression

464-510: LGTM!

The processing loop is well-designed:

  • Correctly implements drain behavior (while self._running or not self._queue.empty())
  • _process_one sanitizes internal exceptions to prevent leaking SQL paths/stack traces per PR objectives
  • Exception logging includes full traceback via logger.exception for debugging while returning sanitized message to callers

811-824: LGTM!

The deferred imports with inline comment explaining the circular dependency reason (communication -> engine -> communication) is a reasonable pattern for breaking the cycle. The imports are properly annotated with noqa: PLC0415.


841-871: LGTM!

Version tracking implementation is clean:

  • _bump_version handles first-version case correctly (defaults to 0, then increments)
  • _check_version logs the conflict with full context before raising
src/ai_company/engine/task_engine_models.py (3)

20-59: LGTM!

CreateTaskData is well-designed:

  • Uses NotBlankStr for all identifier/name fields per coding guidelines
  • Field constraints (ge=0.0 for budget_limit, allow_inf_nan=False) prevent invalid values
  • Defaults align with Task model (Priority.MEDIUM, Complexity.MEDIUM)

259-267: LGTM!

The _check_consistency validator is a good defensive pattern ensuring TaskMutationResult invariants:

  • Successful results must not carry an error
  • Failed results must carry an error description

This prevents callers from receiving ambiguous results.


273-309: LGTM!

TaskStateChanged event model is well-designed:

  • Uses AwareDatetime for timezone-aware timestamps
  • Appropriate nullable fields for edge cases (delete has no task/new_status, create has no previous_status)
  • mutation_type as Literal union ensures type safety
CLAUDE.md (2)

95-95: LGTM!

The engine description update accurately reflects the new TaskEngine functionality added to the codebase.


130-130: LGTM!

The TASK_ENGINE_STARTED event constant is documented consistently with the existing event naming patterns.

src/ai_company/engine/__init__.py (1)

137-149: LGTM!

The public API surface is correctly expanded to include TaskEngine, TaskEngineConfig, and all task mutation types. The imports and __all__ entries are consistent and properly organized.

src/ai_company/api/state.py (1)

106-131: LGTM!

The task_engine property, has_task_engine check, and set_task_engine method follow the established patterns used by other services (e.g., auth_service). Error logging before raising is correctly implemented.

src/ai_company/api/controllers/tasks.py (2)

116-136: LGTM!

The create flow correctly builds CreateTaskData and delegates to the engine. Logging the created task is appropriate.


160-173: LGTM!

The update flow properly catches TaskNotFoundError, logs with context, and translates to NotFoundError for the API response.

src/ai_company/api/app.py (3)

262-271: LGTM!

The TaskEngine startup is correctly placed after persistence and message bus initialization, which ensures the engine can use both for mutations and snapshot publishing. The synchronous start() call matches the TaskEngine API.


299-306: LGTM!

The TaskEngine shutdown is correctly placed in the reverse order of startup — after bridge but before message bus, ensuring pending mutations can still publish snapshots during drain.


145-152: LGTM!

The cleanup-on-failure logic correctly reverses TaskEngine startup by stopping it before cleaning up the message bus and persistence.

docs/architecture/tech-stack.md (1)

122-130: LGTM!

The documentation accurately reflects the adoption of the centralized TaskEngine for state coordination and documents the additional engineering conventions introduced in this PR.

tests/unit/observability/test_events.py (1)

213-213: LGTM!

The task_engine domain module is correctly added to the expected set, ensuring the new observability events are discovered by the module scan.

tests/unit/api/conftest.py (2)

627-644: LGTM!

The test_client fixture correctly injects the fake_task_engine into the app and uses the context manager pattern, which should trigger lifecycle hooks. The default CEO auth header setup is preserved.


609-616: No issue found — code lifecycle pattern is correct.

The fake_task_engine fixture correctly creates an unstarted TaskEngine. When TestClient enters its context manager (line 641), Litestar triggers the app's on_startup hook, which calls task_engine.start() at src/ai_company/api/app.py:264. Tests will not encounter TaskEngineNotRunningError because the engine is started before any test code runs.

tests/unit/api/test_state.py (1)

95-136: LGTM!

The new TestAppStateTaskEngine test class provides comprehensive coverage for the task_engine property, has_task_engine, and set_task_engine methods. The tests follow the established patterns for other service accessors in this file (e.g., auth_service), and using MagicMock is appropriate here since these tests validate AppState behavior rather than TaskEngine internals.

tests/unit/api/test_app.py (3)

66-66: Signature update aligns with TaskEngine integration.

The additional None argument correctly reflects the new task_engine parameter position in _safe_startup.


84-84: Signature update aligns with TaskEngine integration.

The additional None argument correctly reflects the new task_engine parameter position in _safe_shutdown.


86-130: LGTM! Good coverage for TaskEngine lifecycle failure paths.

The new tests appropriately verify:

  1. test_task_engine_failure_cleans_up: When task engine start fails, both persistence and bus are properly cleaned up (defensive rollback behavior).
  2. test_shutdown_task_engine_failure_does_not_propagate: Shutdown suppresses task engine stop failures to ensure graceful shutdown continues.

The mock types are correct—MagicMock for synchronous start() and AsyncMock for asynchronous stop().

tests/unit/engine/test_agent_engine.py (2)

17-17: Import addition aligns with error handling needs.

The TaskMutationError import supports the new _report_to_task_engine exception handling that distinguishes between mutation errors (warning) and unexpected errors (error).


936-1091: LGTM! Comprehensive test coverage for TaskEngine reporting.

The TestReportToTaskEngine suite effectively validates:

  1. No-op behavior when task_engine is None
  2. Non-terminal statuses (e.g., IN_PROGRESS) are correctly skipped
  3. Terminal statuses (e.g., COMPLETED) trigger exactly one transition_task call
  4. TaskMutationError is swallowed (best-effort semantics)
  5. Unexpected exceptions are swallowed (resilience)

Good choice setting recovery_strategy=None in the non-terminal test to isolate the reporting behavior from recovery side effects.

tests/unit/engine/test_task_engine.py (6)

1-29: LGTM! Well-organized test module with comprehensive imports.

The imports cover all necessary mutation types, error types, and configuration classes needed for thorough TaskEngine testing.


33-103: Good minimal fake implementations for isolation.

The FakeTaskRepository, FakePersistence, FakeMessageBus, and FailingMessageBus classes provide sufficient functionality to test TaskEngine behavior without coupling to real implementations. The FailingMessageBus is a nice touch for testing snapshot publish failure resilience.


136-165: Well-designed fixtures with proper teardown.

The engine and engine_with_bus fixtures correctly handle lifecycle by starting the engine and stopping it on teardown. Using AsyncGenerator return type is correct for async fixtures with cleanup.


739-770: Queue full test accesses internal state—acceptable for this edge case.

Directly manipulating eng._running and eng._queue is necessary to test backpressure without a running processing loop consuming items. The cleanup at line 770 prevents leaking state.


780-808: Good error propagation test with inline fake.

The FailingSaveRepo nested class cleanly tests that persistence errors are captured in the result rather than propagating as exceptions.


960-997: Effective immutable field rejection tests.

These tests validate that UpdateTaskMutation and TransitionTaskMutation correctly reject attempts to modify immutable fields (status, id) via their Pydantic validators.

tests/unit/engine/test_task_engine_models.py (3)

1-119: LGTM! Thorough model construction and validation tests.

The tests for CreateTaskData and CreateTaskMutation effectively cover:

  • Minimal and full construction with default verification
  • Input validation (blank title, negative budget)
  • Frozen model immutability

122-224: LGTM! Good coverage for mutation models.

Tests for UpdateTaskMutation, TransitionTaskMutation, DeleteTaskMutation, and CancelTaskMutation verify construction, mutation type literals, optional fields (expected_version, overrides), and validation constraints.


226-312: LGTM! Result and event model tests are comprehensive.

The TaskMutationResult tests cover success/failure paths and immutability. The TaskStateChanged tests validate creation, transition, and delete event scenarios, plus JSON serialization roundtrip for API compatibility.

src/ai_company/engine/agent_engine.py (4)

22-22: Import addition for error handling.

The TaskMutationError import enables differentiated exception handling in _report_to_task_engine.


151-173: LGTM! Clean TaskEngine integration.

The optional task_engine parameter follows the established pattern for optional dependencies (e.g., cost_tracker, approval_store), and storing it as self._task_engine is consistent with other instance attributes.


356-373: Docstring and call placement are correct.

The updated docstring accurately reflects the new TaskEngine reporting step, and placing the call after _apply_post_execution_transitions ensures the final status is available for reporting.


661-713: LGTM! Robust best-effort reporting implementation.

The _report_to_task_engine method correctly:

  1. Short-circuits when no TaskEngine is configured
  2. Skips non-terminal statuses
  3. Distinguishes TaskMutationError (warning) from unexpected exceptions (error with divergence note)
  4. Propagates MemoryError/RecursionError per project convention
  5. Uses structured logging with appropriate levels

The "state may be divergent" message in the error case is a helpful operational hint.

Comment thread docs/design/engine.md Outdated
Comment thread src/ai_company/api/controllers/tasks.py
Comment thread src/ai_company/engine/agent_engine.py Outdated
Comment thread src/ai_company/engine/task_engine_models.py Outdated
Comment thread src/ai_company/engine/task_engine.py Outdated
Comment on lines +730 to +777
async def _apply_cancel(
self,
mutation: CancelTaskMutation,
) -> TaskMutationResult:
"""Cancel a task (shortcut for transition to CANCELLED)."""
task = await self._persistence.tasks.get(mutation.task_id)
if task is None:
return self._not_found_result(
"cancel",
mutation.request_id,
mutation.task_id,
)

previous_status = task.status
try:
updated = task.with_transition(TaskStatus.CANCELLED)
except ValueError as exc:
logger.warning(
TASK_ENGINE_MUTATION_FAILED,
mutation_type="cancel",
request_id=mutation.request_id,
task_id=mutation.task_id,
error=str(exc),
)
return TaskMutationResult(
request_id=mutation.request_id,
success=False,
error=str(exc),
)

await self._persistence.tasks.save(updated)
version = self._bump_version(mutation.task_id)

logger.info(
TASK_ENGINE_MUTATION_APPLIED,
mutation_type="cancel",
request_id=mutation.request_id,
task_id=mutation.task_id,
from_status=previous_status.value,
to_status=TaskStatus.CANCELLED.value,
)
return TaskMutationResult(
request_id=mutation.request_id,
success=True,
task=updated,
version=version,
previous_status=previous_status,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if CancelTaskMutation has expected_version field
ast-grep --pattern 'class CancelTaskMutation($$$) {
  $$$
  expected_version$$$
  $$$
}'

# Check how cancel_task is called in tests to understand expected behavior
rg -n 'cancel_task|CancelTaskMutation' --type py -g '*test*' -A 3

Repository: Aureliolo/synthorg

Length of output: 2487


🏁 Script executed:

# Find and inspect the mutation model definitions
fd -e py -path "*/engine/*" | grep -E "model|mutation" | head -20

Repository: Aureliolo/synthorg

Length of output: 230


🏁 Script executed:

# Look at the CancelTaskMutation definition
rg -n "class CancelTaskMutation" -A 15 --type py

Repository: Aureliolo/synthorg

Length of output: 1448


🏁 Script executed:

# Check UpdateTaskMutation and TransitionTaskMutation definitions
rg -n "class (Update|Transition)TaskMutation" -A 15 --type py

Repository: Aureliolo/synthorg

Length of output: 2791


🏁 Script executed:

# Verify if _apply_update calls _check_version
rg -n "_apply_update|_check_version" -B 2 -A 10 --type py src/ai_company/engine/task_engine.py

Repository: Aureliolo/synthorg

Length of output: 2456


🏁 Script executed:

# Check the cancel_task method signature
rg -n "def cancel_task" -A 10 --type py src/ai_company/engine/task_engine.py

Repository: Aureliolo/synthorg

Length of output: 371


_apply_cancel intentionally omits optimistic concurrency checks; consider if consistency matters.

Unlike _apply_update and _apply_transition, _apply_cancel neither accepts nor validates an expected_version. This is by design: CancelTaskMutation lacks the field entirely, and the public cancel_task() API accepts no version parameter. Cancellations are simpler "shortcut" operations that always succeed regardless of task version.

This design choice is consistent with the implementation (no version checking, no field definition, no API parameter), but it does create asymmetry in the mutation protocol. If future requirements demand version-aware cancellations (e.g., to prevent race conditions), extending both the model and method would require minimal changes. Until then, this asymmetry is acceptable but worth documenting explicitly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/task_engine.py` around lines 730 - 777, The cancel path
(_apply_cancel) currently skips optimistic concurrency checks because
CancelTaskMutation and the public cancel_task() API lack an expected_version; to
make cancellations version-aware, add an expected_version field to
CancelTaskMutation and accept it on cancel_task(), then in _apply_cancel fetch
the current task version from self._persistence.tasks.get, compare it to
mutation.expected_version, and if they differ return a conflict/failed
TaskMutationResult (mirroring the behavior in _apply_update/_apply_transition);
only proceed to task.with_transition, save, bump version (self._bump_version),
and log the applied mutation when the versions match.

Comment thread tests/unit/engine/test_task_engine.py Outdated
assert eng.is_running is False
eng.start()
assert eng.is_running is True
await eng.stop(timeout=2.0) # type: ignore[unreachable]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Spurious # type: ignore[unreachable] comment.

The code after assert eng.is_running is True is reachable—the assert doesn't raise since start() succeeds. This type ignore appears unnecessary and may confuse future readers.

🔧 Suggested fix
         eng.start()
         assert eng.is_running is True
-        await eng.stop(timeout=2.0)  # type: ignore[unreachable]
+        await eng.stop(timeout=2.0)
         assert eng.is_running is False
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await eng.stop(timeout=2.0) # type: ignore[unreachable]
eng.start()
assert eng.is_running is True
await eng.stop(timeout=2.0)
assert eng.is_running is False
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_task_engine.py` at line 183, Remove the spurious type
ignore on the await eng.stop(timeout=2.0) call: the code path is reachable after
assert eng.is_running is True and start() succeeds, so delete the "# type:
ignore[unreachable]" comment next to the eng.stop invocation (referencing the
await eng.stop(timeout=2.0) line and the surrounding assert eng.is_running and
start() usage) to avoid misleading future readers.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is an excellent pull request that introduces a robust, centralized TaskEngine for state management. The changes significantly improve the architecture by decoupling components and providing a single source of truth for task state. The introduction of a typed error hierarchy, comprehensive model validation, and extensive test coverage are all major steps forward for the codebase's quality and maintainability. I've found one critical syntax error and one area for improvement in the error handling design.

Comment on lines +695 to +696
except MemoryError, RecursionError:
raise
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This except syntax is for Python 2. In Python 3, this will raise a SyntaxError. To catch multiple exceptions, you should group them in a tuple.

        except (MemoryError, RecursionError):
            raise

Comment thread src/ai_company/engine/task_engine.py Outdated
Comment on lines +419 to +424
def _raise_typed_error(result: TaskMutationResult) -> None:
"""Raise a typed error from a failed mutation result."""
error = result.error or "Mutation failed"
if "not found" in error:
raise TaskNotFoundError(error)
raise TaskMutationError(error)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Relying on string matching (if "not found" in error:) to determine which typed exception to raise is fragile and incomplete. For instance, a version conflict error will be wrapped in a generic TaskMutationError instead of the more specific TaskVersionConflictError, which seems to contradict the design goals mentioned in the documentation.

A more robust approach would be to add a machine-readable error code to TaskMutationResult in task_engine_models.py. This would allow for reliable dispatching to the correct exception type without string parsing.

For example, you could modify TaskMutationResult:

# in src/ai_company/engine/task_engine_models.py
class TaskMutationResult(BaseModel):
    # ...
    error_code: str | None = Field(default=None)

Then, you could refactor this method to use the error_code.

Suggested change
def _raise_typed_error(result: TaskMutationResult) -> None:
"""Raise a typed error from a failed mutation result."""
error = result.error or "Mutation failed"
if "not found" in error:
raise TaskNotFoundError(error)
raise TaskMutationError(error)
def _raise_typed_error(result: TaskMutationResult) -> None:
"""Raise a typed error from a failed mutation result."""
error = result.error or "Mutation failed"
match result.error_code:
case "not_found":
raise TaskNotFoundError(error)
case "version_conflict":
raise TaskVersionConflictError(error)
case _:
raise TaskMutationError(error)

- Add error_code discriminator to TaskMutationResult (not_found/version_conflict/validation/internal)
- Fix _raise_typed_error to use error_code match instead of fragile string matching
- Fix _processing_loop outer catch to resolve envelope future (prevents caller deadlock)
- Guard happy-path set_result in _process_one with done() check
- Fix _apply_update to use Task.model_validate() instead of model_copy() (runs validators)
- Clean _IMMUTABLE_TASK_FIELDS: remove 4 non-existent timestamp fields (only id/status/created_by)
- Rename _TERMINAL_STATUSES → _REPORTABLE_STATUSES (FAILED/INTERRUPTED are not strictly terminal)
- Fix assigned_to=None override bug in transition_task controller
- Add from_status to task transition audit log
- Fix failure log to use API_TASK_TRANSITION_FAILED event instead of TASK_STATUS_CHANGED
- Map TaskEngineNotRunningError/TaskEngineQueueFullError → ServiceUnavailableError (503)
- Fix create_task missing error handling
- Fix _on_expire broad exception handler to re-raise MemoryError/RecursionError
- Add MemoryError/RecursionError re-raise to _publish_snapshot
- Add API_TASK_TRANSITION_FAILED event constant
- Fix AppState docstring and set_task_engine docstring
- Add version_conflict test to TestTypedErrors
- Add TestDrainTimeout: verify abandoned futures resolved on _fail_remaining_futures
- Add TestMutationResultConsistency: validate success/error invariants enforced by Pydantic
- Add test_memory_error_propagates to TestReportToTaskEngine
- Fix engine.md: text lang specifier, version conflict description, _IMMUTABLE_TASK_FIELDS, asyncio.wait
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/ai_company/api/app.py (1)

286-309: ⚠️ Potential issue | 🟠 Major

Drain the TaskEngine before stopping the bridge.

task_engine.stop() drains queued mutations and can still publish final TaskStateChanged snapshots. Stopping the bridge first means those shutdown-time events never reach WebSocket subscribers even though the bus is still alive. Reverse the order so the engine drains while the bridge is still subscribed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/api/app.py` around lines 286 - 309, The shutdown sequence in
_safe_shutdown stops the MessageBusBridge (bridge.stop()) before the TaskEngine
(task_engine.stop()), which prevents TaskStateChanged snapshots emitted during
engine drain from reaching WebSocket subscribers; change the order so you await
task_engine.stop() while the bridge is still running, then stop the bridge
(i.e., call task_engine.stop() before bridge.stop() in _safe_shutdown), keeping
the existing try/except logging behavior for both operations.
src/ai_company/engine/agent_engine.py (1)

339-344: ⚠️ Potential issue | 🟠 Major

Report the final recovered state, not the pre-recovery result.

_report_to_task_engine() runs before _apply_recovery(), so TerminationReason.ERROR results are still IN_PROGRESS here and get skipped. Exceptions that go through _handle_fatal_error() also bypass _post_execution_pipeline() entirely, so their recovered FAILED state is never reported either. Move reporting to the final post-recovery ExecutionResult, or centralize it after both execution paths produce their final result.

Also applies to: 354-404

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/agent_engine.py` around lines 339 - 344, The code
currently calls _report_to_task_engine() before recovery so results with
TerminationReason.ERROR remain IN_PROGRESS and fatal exceptions handled by
_handle_fatal_error() never report their final FAILED state; update the flow so
reporting uses the post-recovery ExecutionResult (i.e., call
_report_to_task_engine() only after _post_execution_pipeline()/_apply_recovery()
or centralize a single reporting step that runs after both the normal and
fatal-error paths produce their final ExecutionResult), ensuring functions
_post_execution_pipeline, _apply_recovery, and _handle_fatal_error all funnel
into that single reporting point and that the reported state reflects the
recovered status.
src/ai_company/api/controllers/tasks.py (1)

76-80: ⚠️ Potential issue | 🟠 Major

Read-through TaskEngine calls can still bypass your sanitization path.

TaskEngine.get_task() / list_tasks() bypass the queue and hit persistence directly. These controller paths call them without wrapping unexpected backend errors, and the transition pre-read happens before the mutation try block, so a read failure can surface as an unwrapped server error. That extra pre-read is also racy for from_status; if you need it, expose previous_status from the engine instead of reading out-of-band.

Also applies to: 103-107, 223-224

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/api/controllers/tasks.py` around lines 76 - 80, Controller
code calls TaskEngine.get_task() and list_tasks() directly (e.g., the calls to
app_state.task_engine.list_tasks and get_task) which bypasses the
queued/sanitized mutation path and perform an out‑of‑band pre-read for
previous/from_status; instead route reads through the same engine queue or add
exception handling around these calls to convert backend errors into controlled
HTTP errors, and remove the separate pre-read for previous_status by adding and
using a previous_status (or from_status) field returned by TaskEngine methods so
the controller no longer performs a separate persistence read.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/ai_company/api/app.py`:
- Around line 137-145: The startup rollback leaks the MessageBusBridge because
_cleanup_on_failure currently lacks a bridge / started_bridge parameter and thus
never stops an already-started bridge; modify _cleanup_on_failure to accept a
bridge: MessageBusBridge | None and started_bridge: bool, and when
started_bridge is true await bridge.stop() (or the bridge's async shutdown
method) during cleanup; update all callers of _cleanup_on_failure (the boot
error paths that call it after bridge.start() and task_engine.start() failures)
to pass the new bridge and started_bridge flag so the bridge is shut down on
failure.

In `@src/ai_company/api/controllers/tasks.py`:
- Around line 137-147: The handlers calling app_state.task_engine (create_task,
update_task, transition_task — and ensure delete_task too) should not map the
generic TaskMutationError to ApiValidationError; instead catch TaskMutationError
and convert it to a 5xx-level error (e.g., raise ServiceUnavailableError or an
InternalServerError) or route it through an error_code-based mapper so backend
faults return 5xx, while keeping ApiValidationError for typed
business/validation errors only; update the exception blocks around
app_state.task_engine.create_task (and the similar blocks in update_task,
transition_task, delete_task) to remove the TaskMutationError ->
ApiValidationError mapping and replace it with the appropriate server error
mapping.

In `@src/ai_company/engine/agent_engine.py`:
- Around line 690-699: AgentEngine currently calls
self._task_engine.transition_task(...) directly to COMPLETED which violates
Task.with_transition() allowed paths; update the code around the try block that
calls _task_engine.transition_task to instead fetch the task's current persisted
status (via the TaskEngine read API) and emit the required staged transitions
(e.g., ASSIGNED -> IN_PROGRESS, IN_PROGRESS -> IN_REVIEW, IN_REVIEW ->
COMPLETED) by calling transition_task for each intermediate status in order,
preserving the same requested_by and reason (use
execution_result.termination_reason.value); alternatively, if you prefer a
single-shot change, add a TaskEngine API that accepts the already-computed final
snapshot and use that from AgentEngine so the TaskEngine can validate/apply the
full state transition atomically.

In `@src/ai_company/engine/task_engine_models.py`:
- Around line 132-141: Remove the nonexistent field from the immutable-overrides
set: edit the _IMMUTABLE_OVERRIDE_FIELDS constant to drop "created_at" so it
only contains the actual Task model fields ("id", "created_by", "status");
locate the declaration of _IMMUTABLE_OVERRIDE_FIELDS in task_engine_models.py
and update the frozenset to remove "created_at", leaving the other entries
unchanged and ensuring any comments/docstrings remain accurate.

In `@src/ai_company/engine/task_engine.py`:
- Around line 177-190: In _fail_remaining_futures, the TaskMutationResult
returned for unprocessed envelopes is missing error_code; update the
TaskMutationResult creation inside _fail_remaining_futures to include
error_code="internal" (matching other internal failure paths in
_processing_loop) so that _raise_typed_error receives consistent error payloads
for envelope.future.set_result calls.
- Around line 697-710: The ValueError handler for invalid transitions should
mark the failure as a validation error: in the except ValueError as exc block
update the logger.warning call (TASK_ENGINE_MUTATION_FAILED) to include
error_code="validation" and return the TaskMutationResult with
error_code="validation" (alongside request_id, success=False, error=str(exc)) so
callers and _raise_typed_error can distinguish validation failures from internal
errors.
- Around line 773-786: The invalid cancel transition handler in TaskEngine
currently logs and returns a TaskMutationResult without an error_code; update
the except ValueError block in the cancel mutation handling (the code
surrounding mutation_type="cancel") to include error_code="validation" in both
the logger.warning call and the returned TaskMutationResult (use the same
pattern applied in _apply_transition) so the error is labeled consistently as a
validation failure.

In `@tests/unit/engine/test_task_engine.py`:
- Around line 702-730: Modify test_pending_mutations_processed to exercise
stop()-time draining by submitting create_task calls without awaiting them
(collect the returned futures from TaskEngine.create_task), then call
eng.stop(timeout=5.0) while those futures are still pending, then await
asyncio.gather on the collected futures to ensure they resolved and finally
check persistence.tasks.list_tasks() has 5 entries; keep references to
TaskEngine.start, TaskEngine.create_task, TaskEngine.stop, and
persistence.tasks.list_tasks when locating where to change the test.
- Around line 1066-1108: The test currently bypasses the timeout cleanup by
setting eng._running = False and calling eng._fail_remaining_futures() directly;
instead make the engine think processing is still active by leaving _running
True and forcing eng._processing_task to remain pending (e.g., assign a
not-yet-completed asyncio.Future or a task that awaits an Event) so the engine
must take the timeout branch when stop(timeout=...) is called; then call await
eng.stop(timeout=0.01) and assert the envelope.future was resolved with failure
and contains "shut down" in the error. Ensure you reference the TaskEngine
instance (eng), the _processing_task, stop(timeout=...), and the envelope.future
in the test changes.
- Around line 1-1140: The test module exceeds the 800-line limit and should be
split into smaller files — extract groups of tests (e.g. lifecycle →
tests/unit/engine/test_task_engine_lifecycle.py,
create/update/transition/delete/cancel/read-through →
test_task_engine_mutations.py,
publishing/sequential/drain/queue/errors/versioning/config →
test_task_engine_integration.py) and move shared fakes/fixtures
(FakeTaskRepository, FakePersistence, FakeMessageBus, _make_create_data, engine
and engine_with_bus fixtures) into a common helper module or conftest.py so each
new test file imports them; ensure tests reference the same symbols (TaskEngine,
TaskEngineConfig, CreateTaskMutation, UpdateTaskMutation,
TransitionTaskMutation, DeleteTaskMutation, CancelTaskMutation, TaskStatus,
TaskMutationResult) and run unchanged.

---

Outside diff comments:
In `@src/ai_company/api/app.py`:
- Around line 286-309: The shutdown sequence in _safe_shutdown stops the
MessageBusBridge (bridge.stop()) before the TaskEngine (task_engine.stop()),
which prevents TaskStateChanged snapshots emitted during engine drain from
reaching WebSocket subscribers; change the order so you await task_engine.stop()
while the bridge is still running, then stop the bridge (i.e., call
task_engine.stop() before bridge.stop() in _safe_shutdown), keeping the existing
try/except logging behavior for both operations.

In `@src/ai_company/api/controllers/tasks.py`:
- Around line 76-80: Controller code calls TaskEngine.get_task() and
list_tasks() directly (e.g., the calls to app_state.task_engine.list_tasks and
get_task) which bypasses the queued/sanitized mutation path and perform an
out‑of‑band pre-read for previous/from_status; instead route reads through the
same engine queue or add exception handling around these calls to convert
backend errors into controlled HTTP errors, and remove the separate pre-read for
previous_status by adding and using a previous_status (or from_status) field
returned by TaskEngine methods so the controller no longer performs a separate
persistence read.

In `@src/ai_company/engine/agent_engine.py`:
- Around line 339-344: The code currently calls _report_to_task_engine() before
recovery so results with TerminationReason.ERROR remain IN_PROGRESS and fatal
exceptions handled by _handle_fatal_error() never report their final FAILED
state; update the flow so reporting uses the post-recovery ExecutionResult
(i.e., call _report_to_task_engine() only after
_post_execution_pipeline()/_apply_recovery() or centralize a single reporting
step that runs after both the normal and fatal-error paths produce their final
ExecutionResult), ensuring functions _post_execution_pipeline, _apply_recovery,
and _handle_fatal_error all funnel into that single reporting point and that the
reported state reflects the recovered status.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: c798c7d4-086c-45c7-a824-db14ab7084a7

📥 Commits

Reviewing files that changed from the base of the PR and between ef4e7df and 03f14d4.

📒 Files selected for processing (10)
  • docs/design/engine.md
  • src/ai_company/api/app.py
  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/api/state.py
  • src/ai_company/engine/agent_engine.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/engine/task_engine_models.py
  • src/ai_company/observability/events/api.py
  • tests/unit/engine/test_agent_engine.py
  • tests/unit/engine/test_task_engine.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Greptile Review
  • GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: No from __future__ import annotations — Python 3.14 has PEP 649
Use except A, B: syntax (no parentheses) for exception handling — PEP 758 syntax enforced by ruff on Python 3.14
Add type hints to all public functions — enforce mypy strict mode
Use Google-style docstrings — required on public classes and functions, enforced by ruff D rules
For frozen Pydantic models with dict/list fields, use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Use Pydantic v2 BaseModel, model_validator, computed_field, and ConfigDict — avoid redundant stored fields with @computed_field
Use NotBlankStr (from core.types) for all identifier/name fields, including optional (NotBlankStr | None) and tuple variants, instead of manual whitespace validators
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code instead of bare create_task
Keep functions under 50 lines and files under 800 lines
Handle errors explicitly — never silently swallow exceptions
Enforce line length of 88 characters via ruff
Config values should use frozen Pydantic models for immutable config/identity; separate mutable-via-copy models (using model_copy(update=...)) for runtime state that evolves
Never mix static config fields with mutable runtime fields in one model

Files:

  • tests/unit/engine/test_agent_engine.py
  • src/ai_company/api/app.py
  • tests/unit/engine/test_task_engine.py
  • src/ai_company/observability/events/api.py
  • src/ai_company/api/state.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/engine/task_engine_models.py
  • src/ai_company/engine/agent_engine.py
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Use @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, and @pytest.mark.slow markers for test organization
Prefer @pytest.mark.parametrize for testing similar cases
Never use real vendor names in test files — use test-provider, test-small-001, and generic model names

Files:

  • tests/unit/engine/test_agent_engine.py
  • tests/unit/engine/test_task_engine.py
src/ai_company/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/ai_company/**/*.py: Every module with business logic MUST have: from ai_company.observability import get_logger then logger = get_logger(__name__)
Never use import logging, logging.getLogger(), or print() in application code — use the observability logger instead
Always use logger as the variable name (not _logger or log)
Use event name constants from domain-specific modules under ai_company.observability.events (e.g., PROVIDER_CALL_START from events.provider, BUDGET_RECORD_ADDED from events.budget) — import directly
Use structured logging with kwargs: logger.info(EVENT, key=value) — never use string formatting like logger.info("msg %s", val)
Log all error paths at WARNING or ERROR with context before raising
Log all state transitions at INFO level
Use DEBUG level for object creation, internal flow, and entry/exit of key functions

Files:

  • src/ai_company/api/app.py
  • src/ai_company/observability/events/api.py
  • src/ai_company/api/state.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/engine/task_engine_models.py
  • src/ai_company/engine/agent_engine.py
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names: example-provider, example-large-001, example-medium-001, example-small-001, large/medium/small as aliases. Tests must use test-provider, test-small-001, etc.

Files:

  • src/ai_company/api/app.py
  • src/ai_company/observability/events/api.py
  • src/ai_company/api/state.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/engine/task_engine_models.py
  • src/ai_company/engine/agent_engine.py
🧠 Learnings (6)
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Use event name constants from domain-specific modules under `ai_company.observability.events` (e.g., `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`) — import directly

Applied to files:

  • src/ai_company/api/app.py
  • src/ai_company/observability/events/api.py
  • src/ai_company/api/state.py
  • src/ai_company/api/controllers/tasks.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Every module with business logic MUST have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)`

Applied to files:

  • src/ai_company/api/app.py
  • src/ai_company/api/state.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Log all state transitions at INFO level

Applied to files:

  • src/ai_company/observability/events/api.py
  • src/ai_company/api/controllers/tasks.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Never use `import logging`, `logging.getLogger()`, or `print()` in application code — use the observability logger instead

Applied to files:

  • src/ai_company/api/state.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to **/*.py : Handle errors explicitly — never silently swallow exceptions

Applied to files:

  • src/ai_company/engine/agent_engine.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to **/*.py : Use `except A, B:` syntax (no parentheses) for exception handling — PEP 758 syntax enforced by ruff on Python 3.14

Applied to files:

  • src/ai_company/engine/agent_engine.py
🧬 Code graph analysis (7)
tests/unit/engine/test_agent_engine.py (1)
src/ai_company/engine/errors.py (2)
  • ExecutionStateError (12-13)
  • TaskMutationError (97-98)
src/ai_company/api/app.py (3)
src/ai_company/api/state.py (1)
  • task_engine (107-109)
src/ai_company/engine/task_engine.py (3)
  • TaskEngine (80-900)
  • stop (138-175)
  • start (118-136)
src/ai_company/memory/errors.py (1)
  • MemoryError (13-14)
tests/unit/engine/test_task_engine.py (5)
src/ai_company/core/enums.py (2)
  • TaskStatus (198-224)
  • TaskType (227-235)
src/ai_company/core/task.py (1)
  • Task (45-261)
src/ai_company/engine/errors.py (5)
  • TaskEngineNotRunningError (89-90)
  • TaskEngineQueueFullError (93-94)
  • TaskMutationError (97-98)
  • TaskNotFoundError (101-102)
  • TaskVersionConflictError (105-106)
src/ai_company/engine/task_engine.py (12)
  • TaskEngine (80-900)
  • list_tasks (450-471)
  • start (118-136)
  • stop (138-175)
  • is_running (192-194)
  • submit (198-233)
  • create_task (235-266)
  • transition_task (309-355)
  • delete_task (357-386)
  • cancel_task (388-423)
  • _MutationEnvelope (67-77)
  • _fail_remaining_futures (177-189)
src/ai_company/engine/task_engine_config.py (1)
  • TaskEngineConfig (6-37)
src/ai_company/api/state.py (1)
src/ai_company/engine/task_engine.py (1)
  • TaskEngine (80-900)
src/ai_company/engine/task_engine.py (5)
src/ai_company/core/enums.py (1)
  • TaskStatus (198-224)
src/ai_company/core/task.py (1)
  • Task (45-261)
src/ai_company/engine/errors.py (5)
  • TaskEngineNotRunningError (89-90)
  • TaskEngineQueueFullError (93-94)
  • TaskMutationError (97-98)
  • TaskNotFoundError (101-102)
  • TaskVersionConflictError (105-106)
src/ai_company/engine/task_engine_config.py (1)
  • TaskEngineConfig (6-37)
src/ai_company/engine/task_engine_models.py (8)
  • CancelTaskMutation (202-219)
  • CreateTaskData (20-59)
  • CreateTaskMutation (65-80)
  • DeleteTaskMutation (184-199)
  • TaskMutationResult (235-276)
  • TaskStateChanged (282-319)
  • TransitionTaskMutation (143-181)
  • UpdateTaskMutation (98-129)
src/ai_company/api/controllers/tasks.py (6)
src/ai_company/api/state.py (1)
  • task_engine (107-109)
src/ai_company/core/task.py (1)
  • Task (45-261)
src/ai_company/engine/errors.py (4)
  • TaskEngineNotRunningError (89-90)
  • TaskEngineQueueFullError (93-94)
  • TaskMutationError (97-98)
  • TaskNotFoundError (101-102)
src/ai_company/engine/task_engine_models.py (1)
  • CreateTaskData (20-59)
src/ai_company/engine/task_engine.py (6)
  • list_tasks (450-471)
  • get_task (439-448)
  • create_task (235-266)
  • update_task (268-307)
  • transition_task (309-355)
  • delete_task (357-386)
src/ai_company/api/dto.py (1)
  • ApiResponse (31-49)
src/ai_company/engine/task_engine_models.py (2)
src/ai_company/core/enums.py (4)
  • Complexity (247-253)
  • Priority (238-244)
  • TaskStatus (198-224)
  • TaskType (227-235)
src/ai_company/core/task.py (1)
  • Task (45-261)
🪛 LanguageTool
docs/design/engine.md

[style] ~186-~186: Consider using the typographical ellipsis character here instead.
Context: ...mmutable updates**: Each mutation calls model_copy(update=...) on frozen Task models — the origi...

(ELLIPSIS)

🔇 Additional comments (11)
src/ai_company/engine/task_engine.py (5)

1-64: LGTM: Module setup and imports follow coding guidelines.

Logger setup correctly uses from ai_company.observability import get_logger and assigns to logger variable. Imports are well-organized with TYPE_CHECKING guard for protocol types.


66-115: LGTM: Envelope and initialization are well-designed.

Good use of dataclass for internal envelope with appropriate documentation warning about event loop requirement. Engine initialization properly sets up queue with configurable max size.


425-436: LGTM: Error classification now uses error_code instead of string matching.

The _raise_typed_error method correctly matches on result.error_code to dispatch typed exceptions. This addresses the previous review concern about fragile string-based error classification.


808-867: LGTM: Snapshot publishing with best-effort semantics and proper exception handling.

Uses PEP 758 exception syntax (except MemoryError, RecursionError:) as required by coding guidelines. Best-effort publish with logged warnings on failure is appropriate for non-critical snapshot publishing.


870-900: LGTM: Version tracking implementation is sound.

Clean implementation of optimistic concurrency with proper logging of version conflicts.

src/ai_company/engine/task_engine_models.py (6)

1-60: LGTM: Module docstring and CreateTaskData model are well-defined.

Frozen configuration with proper field types using NotBlankStr for identifiers. Good documentation and field constraints.


83-96: LGTM: _IMMUTABLE_TASK_FIELDS correctly matches Task model fields.

The immutable field set now correctly contains only fields that exist on the Task model (id, status, created_by), addressing the previous review concern about nonexistent timestamp fields.


98-182: LGTM: Update and Transition mutations with immutable field protection.

Both mutations correctly use @model_validator(mode="after") with Self return type to reject attempts to modify immutable fields. Frozen configurations ensure request immutability.


184-229: LGTM: Delete, Cancel mutations and TaskMutation union.

Simple frozen models with appropriate fields. Union type enables polymorphic mutation handling in the engine.


235-277: LGTM: TaskMutationResult with error_code enables reliable error dispatch.

The error_code field with typed literals ("not_found", "version_conflict", "validation", "internal") addresses the past review concern about fragile string matching. The _check_consistency validator ensures success/error invariants are maintained.


282-319: LGTM: TaskStateChanged event model for snapshot publication.

Properly typed event model with AwareDatetime and UTC-aware default timestamp. Captures all necessary context for downstream consumers.

Comment thread src/ai_company/api/app.py
Comment on lines +137 to +147
try:
task = await app_state.task_engine.create_task(
task_data,
requested_by=data.created_by,
)
except TaskEngineNotRunningError as exc:
raise ServiceUnavailableError(str(exc)) from exc
except TaskEngineQueueFullError as exc:
raise ServiceUnavailableError(str(exc)) from exc
except TaskMutationError as exc:
raise ApiValidationError(str(exc)) from exc
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't treat catch-all TaskMutationError as a validation error.

The engine also uses generic TaskMutationError for sanitized internal processing failures, so create_task(), update_task(), and transition_task() currently return 4xx for backend faults, while delete_task() lets the same exception escape uncaught. Keep 4xx mappings for typed business errors only; generic engine failures need their own 5xx/503 path or an error_code-based mapper.

Also applies to: 177-195, 248-255, 283-299

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/api/controllers/tasks.py` around lines 137 - 147, The handlers
calling app_state.task_engine (create_task, update_task, transition_task — and
ensure delete_task too) should not map the generic TaskMutationError to
ApiValidationError; instead catch TaskMutationError and convert it to a
5xx-level error (e.g., raise ServiceUnavailableError or an InternalServerError)
or route it through an error_code-based mapper so backend faults return 5xx,
while keeping ApiValidationError for typed business/validation errors only;
update the exception blocks around app_state.task_engine.create_task (and the
similar blocks in update_task, transition_task, delete_task) to remove the
TaskMutationError -> ApiValidationError mapping and replace it with the
appropriate server error mapping.

Comment on lines +690 to +699
try:
await self._task_engine.transition_task(
task_id,
final_status,
requested_by=agent_id,
reason=(
"AgentEngine execution ended: "
f"{execution_result.termination_reason.value}"
),
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

One-hop COMPLETED reporting will be rejected by the real TaskEngine.

AgentEngine completes locally via staged transitions, but this reporter sends a single transition_task(..., COMPLETED). Task.with_transition() only allows COMPLETED from IN_REVIEW, so successful runs starting from the persisted ASSIGNED / IN_PROGRESS states will warn and leave TaskEngine stale. Report the full sequence, including IN_PROGRESS when needed, or add a TaskEngine API that accepts the already-computed final snapshot.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/agent_engine.py` around lines 690 - 699, AgentEngine
currently calls self._task_engine.transition_task(...) directly to COMPLETED
which violates Task.with_transition() allowed paths; update the code around the
try block that calls _task_engine.transition_task to instead fetch the task's
current persisted status (via the TaskEngine read API) and emit the required
staged transitions (e.g., ASSIGNED -> IN_PROGRESS, IN_PROGRESS -> IN_REVIEW,
IN_REVIEW -> COMPLETED) by calling transition_task for each intermediate status
in order, preserving the same requested_by and reason (use
execution_result.termination_reason.value); alternatively, if you prefer a
single-shot change, add a TaskEngine API that accepts the already-computed final
snapshot and use that from AgentEngine so the TaskEngine can validate/apply the
full state transition atomically.

Comment thread src/ai_company/engine/task_engine_models.py
Comment thread src/ai_company/engine/task_engine.py
Comment on lines +697 to +710
except ValueError as exc:
logger.warning(
TASK_ENGINE_MUTATION_FAILED,
mutation_type="transition",
request_id=mutation.request_id,
task_id=mutation.task_id,
error=str(exc),
)
return TaskMutationResult(
request_id=mutation.request_id,
success=False,
error=str(exc),
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add error_code="validation" for invalid transition failures.

When with_transition raises ValueError (invalid state machine transition), the result lacks error_code. This causes _raise_typed_error to raise a generic TaskMutationError instead of allowing callers to distinguish validation failures from internal errors. The TaskMutationResult model explicitly supports "validation" as an error code.

🔧 Proposed fix
             return TaskMutationResult(
                 request_id=mutation.request_id,
                 success=False,
                 error=str(exc),
+                error_code="validation",
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except ValueError as exc:
logger.warning(
TASK_ENGINE_MUTATION_FAILED,
mutation_type="transition",
request_id=mutation.request_id,
task_id=mutation.task_id,
error=str(exc),
)
return TaskMutationResult(
request_id=mutation.request_id,
success=False,
error=str(exc),
)
except ValueError as exc:
logger.warning(
TASK_ENGINE_MUTATION_FAILED,
mutation_type="transition",
request_id=mutation.request_id,
task_id=mutation.task_id,
error=str(exc),
)
return TaskMutationResult(
request_id=mutation.request_id,
success=False,
error=str(exc),
error_code="validation",
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/task_engine.py` around lines 697 - 710, The ValueError
handler for invalid transitions should mark the failure as a validation error:
in the except ValueError as exc block update the logger.warning call
(TASK_ENGINE_MUTATION_FAILED) to include error_code="validation" and return the
TaskMutationResult with error_code="validation" (alongside request_id,
success=False, error=str(exc)) so callers and _raise_typed_error can distinguish
validation failures from internal errors.

Comment thread src/ai_company/engine/task_engine.py
Comment thread tests/unit/engine/test_task_engine.py Outdated
Comment on lines +1 to +1140
"""Tests for the centralized single-writer TaskEngine."""

import asyncio
from collections.abc import AsyncGenerator # noqa: TC003

import pytest

from ai_company.core.enums import (
TaskStatus,
TaskType,
)
from ai_company.core.task import Task # noqa: TC001
from ai_company.engine.errors import (
TaskEngineNotRunningError,
TaskEngineQueueFullError,
TaskMutationError,
TaskNotFoundError,
TaskVersionConflictError,
)
from ai_company.engine.task_engine import TaskEngine
from ai_company.engine.task_engine_config import TaskEngineConfig
from ai_company.engine.task_engine_models import (
CancelTaskMutation,
CreateTaskData,
CreateTaskMutation,
DeleteTaskMutation,
TransitionTaskMutation,
UpdateTaskMutation,
)

# ── Fakes ─────────────────────────────────────────────────────


class FakeTaskRepository:
"""Minimal in-memory task repository for engine tests."""

def __init__(self) -> None:
self._tasks: dict[str, Task] = {}

async def save(self, task: Task) -> None:
self._tasks[task.id] = task

async def get(self, task_id: str) -> Task | None:
return self._tasks.get(task_id)

async def list_tasks(
self,
*,
status: TaskStatus | None = None,
assigned_to: str | None = None,
project: str | None = None,
) -> tuple[Task, ...]:
result = list(self._tasks.values())
if status is not None:
result = [t for t in result if t.status == status]
if assigned_to is not None:
result = [t for t in result if t.assigned_to == assigned_to]
if project is not None:
result = [t for t in result if t.project == project]
return tuple(result)

async def delete(self, task_id: str) -> bool:
return self._tasks.pop(task_id, None) is not None


class FakePersistence:
"""Minimal fake persistence backend with only a task repository."""

def __init__(self) -> None:
self._tasks = FakeTaskRepository()

@property
def tasks(self) -> FakeTaskRepository:
return self._tasks


class FakeMessageBus:
"""Minimal fake message bus that records published messages."""

def __init__(self) -> None:
self.published: list[object] = []
self._running = False

async def start(self) -> None:
self._running = True

async def stop(self) -> None:
self._running = False

@property
def is_running(self) -> bool:
return self._running

async def publish(self, message: object) -> None:
self.published.append(message)


class FailingMessageBus(FakeMessageBus):
"""Message bus that always fails on publish."""

async def publish(self, message: object) -> None:
msg = "Publish failed"
raise RuntimeError(msg)


# ── Fixtures ──────────────────────────────────────────────────


def _make_create_data(**overrides: object) -> CreateTaskData:
"""Build a CreateTaskData with sensible defaults."""
defaults: dict[str, object] = {
"title": "Test task",
"description": "A test task",
"type": TaskType.DEVELOPMENT,
"project": "test-project",
"created_by": "test-agent",
}
defaults.update(overrides)
return CreateTaskData(**defaults) # type: ignore[arg-type]


@pytest.fixture
def persistence() -> FakePersistence:
return FakePersistence()


@pytest.fixture
def message_bus() -> FakeMessageBus:
return FakeMessageBus()


@pytest.fixture
def config() -> TaskEngineConfig:
return TaskEngineConfig(max_queue_size=100)


@pytest.fixture
async def engine(
persistence: FakePersistence,
config: TaskEngineConfig,
) -> AsyncGenerator[TaskEngine]:
"""Create and start a TaskEngine, stop on teardown."""
eng = TaskEngine(
persistence=persistence, # type: ignore[arg-type]
config=config,
)
eng.start()
yield eng
await eng.stop(timeout=2.0)


@pytest.fixture
async def engine_with_bus(
persistence: FakePersistence,
message_bus: FakeMessageBus,
config: TaskEngineConfig,
) -> AsyncGenerator[TaskEngine]:
"""Create and start a TaskEngine with a message bus."""
eng = TaskEngine(
persistence=persistence, # type: ignore[arg-type]
message_bus=message_bus, # type: ignore[arg-type]
config=config,
)
eng.start()
yield eng
await eng.stop(timeout=2.0)


# ── Lifecycle tests ───────────────────────────────────────────


@pytest.mark.unit
class TestTaskEngineLifecycle:
"""Tests for start/stop lifecycle."""

async def test_start_sets_running(
self,
persistence: FakePersistence,
) -> None:
eng = TaskEngine(persistence=persistence) # type: ignore[arg-type]
assert eng.is_running is False
eng.start()
assert eng.is_running is True
await eng.stop(timeout=2.0) # type: ignore[unreachable]
assert eng.is_running is False

async def test_double_start_raises(
self,
persistence: FakePersistence,
) -> None:
eng = TaskEngine(persistence=persistence) # type: ignore[arg-type]
eng.start()
with pytest.raises(RuntimeError, match="already running"):
eng.start()
await eng.stop(timeout=2.0)

async def test_stop_idempotent(
self,
persistence: FakePersistence,
) -> None:
eng = TaskEngine(persistence=persistence) # type: ignore[arg-type]
eng.start()
await eng.stop(timeout=2.0)
await eng.stop(timeout=2.0) # no error

async def test_restart(
self,
persistence: FakePersistence,
) -> None:
eng = TaskEngine(persistence=persistence) # type: ignore[arg-type]
eng.start()
await eng.stop(timeout=2.0)
eng.start()
assert eng.is_running is True
await eng.stop(timeout=2.0)


# ── Submit to stopped engine ──────────────────────────────────


@pytest.mark.unit
class TestSubmitToStoppedEngine:
"""Submitting to a stopped engine raises TaskEngineNotRunningError."""

async def test_submit_raises(
self,
persistence: FakePersistence,
) -> None:
eng = TaskEngine(persistence=persistence) # type: ignore[arg-type]
mutation = CreateTaskMutation(
request_id="req-1",
requested_by="alice",
task_data=_make_create_data(),
)
with pytest.raises(TaskEngineNotRunningError):
await eng.submit(mutation)


# ── Create mutation ───────────────────────────────────────────


@pytest.mark.unit
class TestCreateTask:
"""Tests for task creation via TaskEngine."""

async def test_create_task(
self,
engine: TaskEngine,
persistence: FakePersistence,
) -> None:
task = await engine.create_task(
_make_create_data(title="My Task"),
requested_by="alice",
)
assert task.title == "My Task"
assert task.id.startswith("task-")
assert task.status == TaskStatus.CREATED

stored = await persistence.tasks.get(task.id)
assert stored is not None
assert stored.title == "My Task"

async def test_create_returns_version_1(
self,
engine: TaskEngine,
) -> None:
mutation = CreateTaskMutation(
request_id="req-1",
requested_by="alice",
task_data=_make_create_data(),
)
result = await engine.submit(mutation)
assert result.success is True
assert result.version == 1

async def test_create_with_assignee(
self,
engine: TaskEngine,
) -> None:
task = await engine.create_task(
_make_create_data(assigned_to=None),
requested_by="alice",
)
assert task.assigned_to is None


# ── Update mutation ───────────────────────────────────────────


@pytest.mark.unit
class TestUpdateTask:
"""Tests for task update via TaskEngine."""

async def test_update_fields(
self,
engine: TaskEngine,
) -> None:
task = await engine.create_task(
_make_create_data(title="Original"),
requested_by="alice",
)
updated = await engine.update_task(
task.id,
{"title": "Updated"},
requested_by="alice",
)
assert updated.title == "Updated"
assert updated.id == task.id

async def test_update_empty_no_op(
self,
engine: TaskEngine,
) -> None:
task = await engine.create_task(
_make_create_data(),
requested_by="alice",
)
result = await engine.update_task(
task.id,
{},
requested_by="alice",
)
assert result.title == task.title

async def test_update_not_found(
self,
engine: TaskEngine,
) -> None:
with pytest.raises(TaskMutationError, match="not found"):
await engine.update_task(
"task-nonexistent",
{"title": "X"},
requested_by="alice",
)


# ── Transition mutation ───────────────────────────────────────


@pytest.mark.unit
class TestTransitionTask:
"""Tests for task status transitions via TaskEngine."""

async def test_valid_transition(
self,
engine: TaskEngine,
) -> None:
task = await engine.create_task(
_make_create_data(),
requested_by="alice",
)
assigned = await engine.transition_task(
task.id,
TaskStatus.ASSIGNED,
requested_by="alice",
reason="Assigning",
assigned_to="bob",
)
assert assigned.status == TaskStatus.ASSIGNED
assert assigned.assigned_to == "bob"

async def test_invalid_transition(
self,
engine: TaskEngine,
) -> None:
task = await engine.create_task(
_make_create_data(),
requested_by="alice",
)
with pytest.raises(TaskMutationError):
await engine.transition_task(
task.id,
TaskStatus.COMPLETED,
requested_by="alice",
reason="Skip to done",
assigned_to="bob",
)

async def test_transition_not_found(
self,
engine: TaskEngine,
) -> None:
with pytest.raises(TaskMutationError, match="not found"):
await engine.transition_task(
"task-nonexistent",
TaskStatus.ASSIGNED,
requested_by="alice",
reason="test",
)


# ── Delete mutation ───────────────────────────────────────────


@pytest.mark.unit
class TestDeleteTask:
"""Tests for task deletion via TaskEngine."""

async def test_delete_task(
self,
engine: TaskEngine,
persistence: FakePersistence,
) -> None:
task = await engine.create_task(
_make_create_data(),
requested_by="alice",
)
deleted = await engine.delete_task(task.id, requested_by="alice")
assert deleted is True

stored = await persistence.tasks.get(task.id)
assert stored is None

async def test_delete_not_found(
self,
engine: TaskEngine,
) -> None:
with pytest.raises(TaskMutationError, match="not found"):
await engine.delete_task(
"task-nonexistent",
requested_by="alice",
)


# ── Cancel mutation ───────────────────────────────────────────


@pytest.mark.unit
class TestCancelTask:
"""Tests for task cancellation via TaskEngine."""

async def test_cancel_assigned_task(
self,
engine: TaskEngine,
) -> None:
task = await engine.create_task(
_make_create_data(),
requested_by="alice",
)
assigned = await engine.transition_task(
task.id,
TaskStatus.ASSIGNED,
requested_by="alice",
reason="Assigning",
assigned_to="bob",
)
cancelled = await engine.cancel_task(
assigned.id,
requested_by="alice",
reason="No longer needed",
)
assert cancelled.status == TaskStatus.CANCELLED

async def test_cancel_from_created_fails(
self,
engine: TaskEngine,
) -> None:
"""CREATED -> CANCELLED is not a valid transition."""
task = await engine.create_task(
_make_create_data(),
requested_by="alice",
)
with pytest.raises(TaskMutationError):
await engine.cancel_task(
task.id,
requested_by="alice",
reason="Oops",
)


# ── Read-through ──────────────────────────────────────────────


@pytest.mark.unit
class TestReadThrough:
"""Tests for read-through methods that bypass the queue."""

async def test_get_task(
self,
engine: TaskEngine,
) -> None:
task = await engine.create_task(
_make_create_data(title="Findme"),
requested_by="alice",
)
found = await engine.get_task(task.id)
assert found is not None
assert found.title == "Findme"

async def test_get_task_not_found(
self,
engine: TaskEngine,
) -> None:
result = await engine.get_task("task-nonexistent")
assert result is None

async def test_list_tasks(
self,
engine: TaskEngine,
) -> None:
await engine.create_task(
_make_create_data(project="proj-a"),
requested_by="alice",
)
await engine.create_task(
_make_create_data(project="proj-b"),
requested_by="alice",
)
all_tasks = await engine.list_tasks()
assert len(all_tasks) == 2

filtered = await engine.list_tasks(project="proj-a")
assert len(filtered) == 1

async def test_list_tasks_by_status(
self,
engine: TaskEngine,
) -> None:
task = await engine.create_task(
_make_create_data(),
requested_by="alice",
)
await engine.transition_task(
task.id,
TaskStatus.ASSIGNED,
requested_by="alice",
reason="Assigning",
assigned_to="bob",
)

created = await engine.list_tasks(status=TaskStatus.CREATED)
assigned = await engine.list_tasks(status=TaskStatus.ASSIGNED)
assert len(created) == 0
assert len(assigned) == 1


# ── Version tracking ──────────────────────────────────────────


@pytest.mark.unit
class TestVersionTracking:
"""Tests for the in-memory version counter."""

async def test_version_increments(
self,
engine: TaskEngine,
) -> None:
mutation = CreateTaskMutation(
request_id="req-1",
requested_by="alice",
task_data=_make_create_data(),
)
r1 = await engine.submit(mutation)
assert r1.version == 1

update = UpdateTaskMutation(
request_id="req-2",
requested_by="alice",
task_id=r1.task.id, # type: ignore[union-attr]
updates={"title": "Updated"},
)
r2 = await engine.submit(update)
assert r2.version == 2

async def test_version_conflict(
self,
engine: TaskEngine,
) -> None:
task = await engine.create_task(
_make_create_data(),
requested_by="alice",
)
# version is 1 after create; expected_version=99 should fail
update = UpdateTaskMutation(
request_id="req-2",
requested_by="alice",
task_id=task.id,
updates={"title": "X"},
expected_version=99,
)
result = await engine.submit(update)
assert result.success is False
assert "conflict" in (result.error or "").lower()

async def test_version_reset_on_delete(
self,
engine: TaskEngine,
) -> None:
task = await engine.create_task(
_make_create_data(),
requested_by="alice",
)
delete = DeleteTaskMutation(
request_id="req-3",
requested_by="alice",
task_id=task.id,
)
result = await engine.submit(delete)
assert result.version == 0


# ── Snapshot publishing ───────────────────────────────────────


@pytest.mark.unit
class TestSnapshotPublishing:
"""Tests for event publishing to the message bus."""

async def test_snapshot_published_on_create(
self,
engine_with_bus: TaskEngine,
message_bus: FakeMessageBus,
) -> None:
await engine_with_bus.create_task(
_make_create_data(),
requested_by="alice",
)
# Yield to event loop so the processing loop completes snapshot publication
await asyncio.sleep(0)
assert len(message_bus.published) == 1

async def test_snapshot_publish_failure_does_not_affect_mutation(
self,
persistence: FakePersistence,
config: TaskEngineConfig,
) -> None:
failing_bus = FailingMessageBus()
eng = TaskEngine(
persistence=persistence, # type: ignore[arg-type]
message_bus=failing_bus, # type: ignore[arg-type]
config=config,
)
eng.start()
try:
task = await eng.create_task(
_make_create_data(),
requested_by="alice",
)
assert task.id.startswith("task-")

stored = await persistence.tasks.get(task.id)
assert stored is not None
finally:
await eng.stop(timeout=2.0)

async def test_no_snapshot_when_disabled(
self,
persistence: FakePersistence,
message_bus: FakeMessageBus,
) -> None:
no_snap_config = TaskEngineConfig(publish_snapshots=False)
eng = TaskEngine(
persistence=persistence, # type: ignore[arg-type]
message_bus=message_bus, # type: ignore[arg-type]
config=no_snap_config,
)
eng.start()
try:
await eng.create_task(
_make_create_data(),
requested_by="alice",
)
await asyncio.sleep(0)
assert len(message_bus.published) == 0
finally:
await eng.stop(timeout=2.0)


# ── Sequential ordering ──────────────────────────────────────


@pytest.mark.unit
class TestSequentialOrdering:
"""Tests that mutations are processed sequentially."""

async def test_concurrent_submits(
self,
engine: TaskEngine,
) -> None:
"""Multiple concurrent creates all succeed without interleaving."""
tasks = await asyncio.gather(
*(
engine.create_task(
_make_create_data(title=f"Task {i}"),
requested_by="alice",
)
for i in range(10)
),
)
assert len(tasks) == 10
ids = {t.id for t in tasks}
assert len(ids) == 10 # all unique


# ── Drain on stop ─────────────────────────────────────────────


@pytest.mark.unit
class TestDrainOnStop:
"""Tests that stop() drains pending mutations."""

async def test_pending_mutations_processed(
self,
persistence: FakePersistence,
) -> None:
config = TaskEngineConfig(max_queue_size=100)
eng = TaskEngine(
persistence=persistence, # type: ignore[arg-type]
config=config,
)
eng.start()

# Submit several mutations
results = await asyncio.gather(
*(
eng.create_task(
_make_create_data(title=f"Drain {i}"),
requested_by="alice",
)
for i in range(5)
),
)
assert len(results) == 5

await eng.stop(timeout=5.0)
assert eng.is_running is False

# All tasks should be persisted
all_tasks = await persistence.tasks.list_tasks()
assert len(all_tasks) == 5


# ── Queue full ────────────────────────────────────────────────


@pytest.mark.unit
class TestQueueFull:
"""Tests for queue full backpressure."""

async def test_queue_full_raises(
self,
persistence: FakePersistence,
) -> None:
from ai_company.engine.task_engine import _MutationEnvelope

tiny_config = TaskEngineConfig(max_queue_size=1)
eng = TaskEngine(
persistence=persistence, # type: ignore[arg-type]
config=tiny_config,
)
# Start the engine but pause the processing loop
eng._running = True

# First submit fills the queue
mutation1 = CreateTaskMutation(
request_id="req-1",
requested_by="alice",
task_data=_make_create_data(),
)
eng._queue.put_nowait(_MutationEnvelope(mutation=mutation1))

# Second submit should fail because queue is full
mutation2 = CreateTaskMutation(
request_id="req-2",
requested_by="alice",
task_data=_make_create_data(),
)
with pytest.raises(TaskEngineQueueFullError, match="queue is full"):
await eng.submit(mutation2)

eng._running = False


# ── Error propagation ────────────────────────────────────────


@pytest.mark.unit
class TestErrorPropagation:
"""Tests for error propagation via futures."""

async def test_persistence_error_returns_failure(
self,
persistence: FakePersistence,
config: TaskEngineConfig,
) -> None:
"""Persistence errors during mutation are captured in the result."""

class FailingSaveRepo(FakeTaskRepository):
async def save(self, task: Task) -> None:
msg = "Disk full"
raise OSError(msg)

persistence._tasks = FailingSaveRepo()
eng = TaskEngine(
persistence=persistence, # type: ignore[arg-type]
config=config,
)
eng.start()
try:
mutation = CreateTaskMutation(
request_id="req-1",
requested_by="alice",
task_data=_make_create_data(),
)
result = await eng.submit(mutation)
assert result.success is False
assert result.error == "Internal error processing mutation"
finally:
await eng.stop(timeout=2.0)


# ── TaskEngineConfig ──────────────────────────────────────────


@pytest.mark.unit
class TestTaskEngineConfig:
"""Tests for TaskEngineConfig model."""

def test_defaults(self) -> None:
config = TaskEngineConfig()
assert config.max_queue_size == 1000
assert config.drain_timeout_seconds == 10.0
assert config.publish_snapshots is True

def test_custom_values(self) -> None:
config = TaskEngineConfig(
max_queue_size=500,
drain_timeout_seconds=5.0,
publish_snapshots=False,
)
assert config.max_queue_size == 500
assert config.drain_timeout_seconds == 5.0
assert config.publish_snapshots is False

def test_frozen(self) -> None:
from pydantic import ValidationError

config = TaskEngineConfig()
with pytest.raises(ValidationError):
config.max_queue_size = 999 # type: ignore[misc]


# -- Version conflict on transition ────────────────────────────


@pytest.mark.unit
class TestVersionConflictOnTransition:
"""Version conflict detection on transition mutations."""

async def test_transition_version_conflict(
self,
engine: TaskEngine,
) -> None:
task = await engine.create_task(
_make_create_data(),
requested_by="alice",
)
mutation = TransitionTaskMutation(
request_id="req-1",
requested_by="alice",
task_id=task.id,
target_status=TaskStatus.ASSIGNED,
reason="Assigning",
overrides={"assigned_to": "bob"},
expected_version=99,
)
result = await engine.submit(mutation)
assert result.success is False
assert "conflict" in (result.error or "").lower()


# -- Cancel not found ─────────────────────────────────────────


@pytest.mark.unit
class TestCancelNotFound:
"""Cancel mutation on a non-existent task."""

async def test_cancel_not_found(
self,
engine: TaskEngine,
) -> None:
with pytest.raises(TaskNotFoundError, match="not found"):
await engine.cancel_task(
"task-nonexistent",
requested_by="alice",
reason="test",
)


# -- Previous status in results ────────────────────────────────


@pytest.mark.unit
class TestPreviousStatus:
"""Verify previous_status is populated in mutation results."""

async def test_create_has_no_previous_status(
self,
engine: TaskEngine,
) -> None:
mutation = CreateTaskMutation(
request_id="req-1",
requested_by="alice",
task_data=_make_create_data(),
)
result = await engine.submit(mutation)
assert result.success is True
assert result.previous_status is None

async def test_transition_has_previous_status(
self,
engine: TaskEngine,
) -> None:
task = await engine.create_task(
_make_create_data(),
requested_by="alice",
)
mutation = TransitionTaskMutation(
request_id="req-1",
requested_by="alice",
task_id=task.id,
target_status=TaskStatus.ASSIGNED,
reason="Assigning",
overrides={"assigned_to": "bob"},
)
result = await engine.submit(mutation)
assert result.success is True
assert result.previous_status == TaskStatus.CREATED

async def test_cancel_has_previous_status(
self,
engine: TaskEngine,
) -> None:
task = await engine.create_task(
_make_create_data(),
requested_by="alice",
)
# First move to ASSIGNED so cancel is valid
await engine.transition_task(
task.id,
TaskStatus.ASSIGNED,
requested_by="alice",
reason="Assigning",
assigned_to="bob",
)
mutation = CancelTaskMutation(
request_id="req-1",
requested_by="alice",
task_id=task.id,
reason="No longer needed",
)
result = await engine.submit(mutation)
assert result.success is True
assert result.previous_status == TaskStatus.ASSIGNED


# -- Immutable field rejection ─────────────────────────────────


@pytest.mark.unit
class TestImmutableFieldRejection:
"""UpdateTaskMutation and TransitionTaskMutation reject immutable fields."""

def test_update_rejects_status(self) -> None:
from pydantic import ValidationError

with pytest.raises(ValidationError, match="immutable"):
UpdateTaskMutation(
request_id="req-1",
requested_by="alice",
task_id="task-1",
updates={"status": "completed"},
)

def test_update_rejects_id(self) -> None:
from pydantic import ValidationError

with pytest.raises(ValidationError, match="immutable"):
UpdateTaskMutation(
request_id="req-1",
requested_by="alice",
task_id="task-1",
updates={"id": "new-id"},
)

def test_transition_rejects_id_override(self) -> None:
from pydantic import ValidationError

with pytest.raises(ValidationError, match="immutable"):
TransitionTaskMutation(
request_id="req-1",
requested_by="alice",
task_id="task-1",
target_status=TaskStatus.ASSIGNED,
reason="test",
overrides={"id": "new-id"},
)


# -- Typed error propagation ──────────────────────────────────


@pytest.mark.unit
class TestTypedErrors:
"""Convenience methods raise typed errors."""

async def test_update_not_found_raises_typed(
self,
engine: TaskEngine,
) -> None:
with pytest.raises(TaskNotFoundError):
await engine.update_task(
"task-nonexistent",
{"title": "X"},
requested_by="alice",
)

async def test_delete_not_found_raises_typed(
self,
engine: TaskEngine,
) -> None:
with pytest.raises(TaskNotFoundError):
await engine.delete_task(
"task-nonexistent",
requested_by="alice",
)

async def test_transition_not_found_raises_typed(
self,
engine: TaskEngine,
) -> None:
with pytest.raises(TaskNotFoundError):
await engine.transition_task(
"task-nonexistent",
TaskStatus.ASSIGNED,
requested_by="alice",
reason="test",
)

async def test_update_version_conflict_raises_typed(
self,
engine: TaskEngine,
) -> None:
"""Version conflict via convenience method raises TaskVersionConflictError."""
task = await engine.create_task(
_make_create_data(),
requested_by="alice",
)
with pytest.raises(TaskVersionConflictError, match="conflict"):
await engine.update_task(
task.id,
{"title": "changed"},
requested_by="alice",
expected_version=99,
)


# -- Drain timeout / _fail_remaining_futures ──────────────────


@pytest.mark.unit
class TestDrainTimeout:
"""Verify _fail_remaining_futures resolves abandoned futures."""

async def test_abandoned_futures_resolved_on_drain_timeout(
self,
persistence: FakePersistence,
) -> None:
"""Futures left after drain timeout get failure results."""
from ai_company.engine.task_engine import _MutationEnvelope

config = TaskEngineConfig(drain_timeout_seconds=0.01)
eng = TaskEngine(
persistence=persistence, # type: ignore[arg-type]
config=config,
)
eng.start()

# Pause processing by filling with a slow mutation
mutation = CreateTaskMutation(
request_id="req-slow",
requested_by="alice",
task_data=_make_create_data(),
)
# Submit one that will process normally
await eng.submit(mutation)

# Now stop the engine — force a very short drain
# Submit directly to queue to avoid await
mutation2 = CreateTaskMutation(
request_id="req-abandoned",
requested_by="alice",
task_data=_make_create_data(),
)
envelope = _MutationEnvelope(mutation=mutation2)
eng._queue.put_nowait(envelope)
eng._running = False

# Call _fail_remaining_futures directly
eng._fail_remaining_futures()
assert envelope.future.done()
result = envelope.future.result()
assert result.success is False
assert "shut down" in (result.error or "")

await eng.stop(timeout=0.1)


# -- TaskMutationResult consistency ────────────────────────────


@pytest.mark.unit
class TestMutationResultConsistency:
"""Verify _check_consistency validator on TaskMutationResult."""

def test_success_with_error_rejected(self) -> None:
"""Successful result must not carry an error."""
from pydantic import ValidationError

from ai_company.engine.task_engine_models import TaskMutationResult

with pytest.raises(ValidationError, match="error"):
TaskMutationResult(
request_id="r",
success=True,
error="oops",
)

def test_failure_without_error_rejected(self) -> None:
"""Failed result must carry an error description."""
from pydantic import ValidationError

from ai_company.engine.task_engine_models import TaskMutationResult

with pytest.raises(ValidationError, match="error"):
TaskMutationResult(
request_id="r",
success=False,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Split this new test module.

At ~1,140 lines, this file is already far past the repo's 800-line cap. Please break the lifecycle, mutation, publishing, and shutdown cases into smaller modules so failures stay local and the fakes/fixtures are easier to navigate and reuse.

As per coding guidelines Keep functions under 50 lines and files under 800 lines.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_task_engine.py` around lines 1 - 1140, The test module
exceeds the 800-line limit and should be split into smaller files — extract
groups of tests (e.g. lifecycle →
tests/unit/engine/test_task_engine_lifecycle.py,
create/update/transition/delete/cancel/read-through →
test_task_engine_mutations.py,
publishing/sequential/drain/queue/errors/versioning/config →
test_task_engine_integration.py) and move shared fakes/fixtures
(FakeTaskRepository, FakePersistence, FakeMessageBus, _make_create_data, engine
and engine_with_bus fixtures) into a common helper module or conftest.py so each
new test file imports them; ensure tests reference the same symbols (TaskEngine,
TaskEngineConfig, CreateTaskMutation, UpdateTaskMutation,
TransitionTaskMutation, DeleteTaskMutation, CancelTaskMutation, TaskStatus,
TaskMutationResult) and run unchanged.

Comment thread tests/unit/engine/test_task_engine.py Outdated
Comment on lines +702 to +730
async def test_pending_mutations_processed(
self,
persistence: FakePersistence,
) -> None:
config = TaskEngineConfig(max_queue_size=100)
eng = TaskEngine(
persistence=persistence, # type: ignore[arg-type]
config=config,
)
eng.start()

# Submit several mutations
results = await asyncio.gather(
*(
eng.create_task(
_make_create_data(title=f"Drain {i}"),
requested_by="alice",
)
for i in range(5)
),
)
assert len(results) == 5

await eng.stop(timeout=5.0)
assert eng.is_running is False

# All tasks should be persisted
all_tasks = await persistence.tasks.list_tasks()
assert len(all_tasks) == 5
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This doesn't exercise stop()-time draining.

All five create_task() calls are awaited before stop() starts, so the queue has already drained by the time the assertion runs. Submit work without awaiting completion, call stop(), then assert those pending futures resolve and persistence contains the drained tasks.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_task_engine.py` around lines 702 - 730, Modify
test_pending_mutations_processed to exercise stop()-time draining by submitting
create_task calls without awaiting them (collect the returned futures from
TaskEngine.create_task), then call eng.stop(timeout=5.0) while those futures are
still pending, then await asyncio.gather on the collected futures to ensure they
resolved and finally check persistence.tasks.list_tasks() has 5 entries; keep
references to TaskEngine.start, TaskEngine.create_task, TaskEngine.stop, and
persistence.tasks.list_tasks when locating where to change the test.

Comment thread tests/unit/engine/test_task_engine.py Outdated
Comment on lines +1066 to +1108
async def test_abandoned_futures_resolved_on_drain_timeout(
self,
persistence: FakePersistence,
) -> None:
"""Futures left after drain timeout get failure results."""
from ai_company.engine.task_engine import _MutationEnvelope

config = TaskEngineConfig(drain_timeout_seconds=0.01)
eng = TaskEngine(
persistence=persistence, # type: ignore[arg-type]
config=config,
)
eng.start()

# Pause processing by filling with a slow mutation
mutation = CreateTaskMutation(
request_id="req-slow",
requested_by="alice",
task_data=_make_create_data(),
)
# Submit one that will process normally
await eng.submit(mutation)

# Now stop the engine — force a very short drain
# Submit directly to queue to avoid await
mutation2 = CreateTaskMutation(
request_id="req-abandoned",
requested_by="alice",
task_data=_make_create_data(),
)
envelope = _MutationEnvelope(mutation=mutation2)
eng._queue.put_nowait(envelope)
eng._running = False

# Call _fail_remaining_futures directly
eng._fail_remaining_futures()
assert envelope.future.done()
result = envelope.future.result()
assert result.success is False
assert "shut down" in (result.error or "")

await eng.stop(timeout=0.1)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The drain-timeout test bypasses the timeout path.

Setting _running = False and calling _fail_remaining_futures() directly means stop() returns immediately, so this test never verifies the real timeout cleanup branch. Force _processing_task to stay pending, call stop(timeout=...), and assert that path resolves the abandoned future.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_task_engine.py` around lines 1066 - 1108, The test
currently bypasses the timeout cleanup by setting eng._running = False and
calling eng._fail_remaining_futures() directly; instead make the engine think
processing is still active by leaving _running True and forcing
eng._processing_task to remain pending (e.g., assign a not-yet-completed
asyncio.Future or a task that awaits an Event) so the engine must take the
timeout branch when stop(timeout=...) is called; then call await
eng.stop(timeout=0.01) and assert the envelope.future was resolved with failure
and contains "shut down" in the error. Ensure you reference the TaskEngine
instance (eng), the _processing_task, stop(timeout=...), and the envelope.future
in the test changes.

…tness

- Add TaskInternalError for internal engine faults (maps to 5xx vs 4xx)
- Thread error_code through all TaskMutationResult failure paths
- Extend _raise_typed_error to cover 'internal' code with -> Never return
- Add TaskInternalError handling in all TaskController endpoints
- Fix bridge leak in _cleanup_on_failure (started_bridge flag was missing)
- Remove phantom 'created_at' from _IMMUTABLE_OVERRIDE_FIELDS (field absent on Task)
- Change transition_task to return tuple[Task, TaskStatus | None] (eliminates extra get_task round-trip in controller)
- Split 1140-line test_task_engine.py into three focused files + helpers
- Move TaskEngine fixtures from helpers to conftest.py (auto-discovery, no F401 hacks)
- Fix all ruff/mypy issues in new test files (TC001, I001, F811, E501, unused-ignore)
Copilot AI review requested due to automatic review settings March 12, 2026 15:33
@Aureliolo Aureliolo temporarily deployed to cloudflare-preview March 12, 2026 15:34 — with GitHub Actions Inactive
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 27 out of 27 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +651 to +655
merged = task.model_dump()
merged.update(mutation.updates)
updated = Task.model_validate(merged)
await self._persistence.tasks.save(updated)
version = self._bump_version(mutation.task_id)
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_apply_update() calls Task.model_validate(merged) without catching Pydantic validation errors. This means invalid updates (e.g., setting assigned_to while status is CREATED, or passing an unknown field) will be treated as an internal engine fault and returned as error_code="internal" with a generic message, instead of a client-correctable validation failure. Catch validation/ValueError from the re-validation step, log the detailed error server-side, and return a failed TaskMutationResult with error_code="validation" and a scrubbed user-facing message.

Copilot uses AI. Check for mistakes.
Comment on lines +165 to +173
except TimeoutError:
logger.warning(
TASK_ENGINE_DRAIN_TIMEOUT,
remaining=self._queue.qsize(),
)
self._processing_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._processing_task
self._fail_remaining_futures()
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the drain-timeout path, stop() cancels _processing_task and then only fails futures still sitting in _queue. If the processing loop was cancelled while handling an already-dequeued envelope (e.g., blocked in persistence), that envelope's future is no longer in the queue and will never be resolved, potentially hanging callers awaiting submit()/convenience methods. Consider tracking the in-flight envelope (or using a finally block in _processing_loop/_process_one) so cancellation during shutdown reliably completes the currently-processing future with a failure result.

Copilot uses AI. Check for mistakes.
Comment on lines +491 to +505
await self._process_one(envelope)
except Exception:
logger.exception(
TASK_ENGINE_LOOP_ERROR,
error="Unhandled exception in processing loop",
)
if not envelope.future.done():
envelope.future.set_result(
TaskMutationResult(
request_id=envelope.mutation.request_id,
success=False,
error="Internal error in processing loop",
error_code="internal",
),
)
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The processing loop catches Exception around _process_one(), which will also swallow MemoryError/RecursionError. Elsewhere in the codebase, best-effort exception handling consistently re-raises these fatal errors. Add an except MemoryError, RecursionError: raise before the generic except Exception here (and similarly in _process_one) to avoid continuing in a potentially corrupted state.

Copilot uses AI. Check for mistakes.
Comment on lines +521 to +537
except Exception as exc:
internal_msg = f"{type(exc).__name__}: {exc}"
logger.exception(
TASK_ENGINE_MUTATION_FAILED,
mutation_type=mutation.mutation_type,
request_id=mutation.request_id,
error=internal_msg,
)
if not envelope.future.done():
envelope.future.set_result(
TaskMutationResult(
request_id=mutation.request_id,
success=False,
error="Internal error processing mutation",
error_code="internal",
),
)
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_process_one() uses except Exception as exc: to convert all failures into a generic internal error result. This also catches MemoryError/RecursionError, which the rest of the codebase treats as fatal and re-raises even in best-effort blocks. Add a specific except MemoryError, RecursionError: raise before the generic handler so OOM/recursion failures aren’t silently swallowed.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: implement centralized single-writer state coordination (TaskEngine)

2 participants