Skip to content

feat(storage): add CitusStorageBackend for distributed PostgreSQL#238

Merged
yasha-dev1 merged 3 commits intomainfrom
fix/add-support-for-citus-in-pg
Mar 1, 2026
Merged

feat(storage): add CitusStorageBackend for distributed PostgreSQL#238
yasha-dev1 merged 3 commits intomainfrom
fix/add-support-for-citus-in-pg

Conversation

@yasha-dev1
Copy link
Collaborator

Summary

  • Adds CitusStorageBackend that extends PostgresStorageBackend, reusing all query logic and only overriding _initialize_schema() to produce Citus-compatible DDL
  • Adds CitusMigrationRunner that extends PostgresMigrationRunner to handle V2 migration without adding cross-shard-incompatible UNIQUE constraints
  • Wires up PYWORKFLOW_STORAGE_TYPE=citus env var support (reuses existing PYWORKFLOW_POSTGRES_* vars)

Schema changes vs plain PostgreSQL

Table Change
events PK: event_id(run_id, event_id)
steps PK: step_id(run_id, step_id) + shard-local idx_steps_step_id
workflow_runs parent_run_id FK dropped; idempotency_key unique index → plain index
hooks UNIQUE on token → plain index
cancellation_flags FK on run_id dropped
schema_versions Becomes a Citus reference table (replicated to all workers)

Distribution: workflow_runs, events, steps, hooks, cancellation_flags co-located on run_id; schedules distributed on schedule_id. Table distribution is idempotent (uses pg_class join on pg_dist_partition to skip already-distributed tables).

Risk tier

Tier 2 — new storage backend (additive); no changes to existing backends, engine, or executor.

Test plan

  • 16 new unit tests covering init, inheritance, storage_to_config() round-trips, config_to_storage(), caching, and env-var loading
  • All 756 existing unit tests pass
  • ruff check clean
  • mypy — no new errors in changed files

🤖 Generated with Claude Code

Adds a new CitusStorageBackend that extends PostgresStorageBackend,
reusing all query logic while overriding schema initialization to create
Citus-compatible distributed tables (composite PKs, no cross-shard FKs,
reference table for schema_versions).

Distribution strategy:
- workflow_runs, events, steps, hooks, cancellation_flags: sharded on run_id
- schedules: sharded on schedule_id
- schema_versions: reference table (replicated to all workers)

Enables PYWORKFLOW_STORAGE_TYPE=citus using the existing
PYWORKFLOW_POSTGRES_* env vars.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@github-actions
Copy link
Contributor

github-actions bot commented Feb 28, 2026

🔍 Code Review Agent — Tier 2

Commit: 08f3fbd2275e
Status: ✅ Complete
Verdict: 💬 COMMENT — Warnings identified (misleading comments and convention violation) but no blocking issues preventing merge; code is functionally correct and safe

Now I have a thorough understanding of the PR. Here is the code review:


Summary

This PR adds CitusStorageBackend, a distributed PostgreSQL storage backend built on top of CitusData. The implementation extends PostgresStorageBackend, reusing all query logic while overriding only _initialize_schema to emit Citus-compatible DDL (composite PKs, no cross-shard FKs, non-unique idempotency-key index, reference table for schema_versions). A companion CitusMigrationRunner handles the V2 migration without adding the UNIQUE constraint that Citus cannot enforce. Changes to storage/config.py and config.py wire the new backend into the config round-trip and environment-variable loading path. Unit tests cover initialization, config round-trips, caching, and env-var loading.


Risk Assessment

Tier 2 is appropriate. All changed files are in pyworkflow/storage/ (new module + config wiring) and tests/. None of the Tier 3 critical paths (executor.py, replay.py, tasks.py, storage/base.py, pyproject.toml) are touched.


Issues

1. (warning) pyworkflow/storage/citus.py, lines 209–213 / 236–239 / 262–264 — Misleading comments claim "FK on run_id retained" for events, steps, and hooks, but the actual DDL for those tables contains no REFERENCES clause. The PostgreSQL backend defines REFERENCES workflow_runs(run_id) ON DELETE CASCADE on all three tables. The Citus DDL silently omits these FKs. While there is currently no delete_run operation in the storage interface, the discrepancy between comments and actual DDL will mislead a future maintainer who adds a run-deletion operation and expects cascade behaviour to match the Postgres backend.

2. (warning) pyworkflow/storage/citus.py, lines 362–364 — F-string SQL is used to interpolate hardcoded table names into create_distributed_table(...) calls. The values are all compile-time constants from colocated_run_id, so there is no injection risk today. However, this pattern technically violates the project's "never f-string SQL" convention from CLAUDE.md. At a minimum a comment should explain why parameterization is not possible for DDL identifiers here, to prevent future contributors from following the pattern with user-supplied input.


Architecture

The dependency rule (coreenginecelery) is respected. citus.py imports only from pyworkflow.storage.*, which is allowed. storage/config.py and pyworkflow/config.py use lazy imports inside conditionals to avoid circular dependencies. No celery/ module is imported from core/ or engine/. Architecture is compliant.


Test Coverage

The unit tests are well-structured and cover the key non-live paths: constructor parameters, inheritance assertions, storage_to_config/config_to_storage round-trips (with and without DSN), per-process caching, and env-var loading. The test file correctly documents that _initialize_schema and _distribute_tables are not tested here and require a live Citus instance (integration tests). Coverage is adequate for a Tier 2 PR given the live-cluster dependency.


🤖 Code Review Agent — automated code review.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@github-actions
Copy link
Contributor

github-actions bot commented Mar 1, 2026

🔄 New commits pushed — requesting re-review.

Commit: 78208da790d2
sha:78208da790d2a70ac5b22caf20b425707011b33f

@github-actions
Copy link
Contributor

github-actions bot commented Mar 1, 2026

🔍 Code Review Agent — Tier 2

Commit: 78208da790d2
Status: ✅ Complete

Now I have a complete picture of the changes. Let me write the review.


Summary

This PR adds CitusStorageBackend, a distributed PostgreSQL storage backend that extends PostgresStorageBackend with Citus-specific DDL. Key changes include: composite primary keys that include the distribution column (run_id), dropping cross-shard foreign key and unique constraints, a CitusMigrationRunner that overrides the V2 migration to skip unsupported unique constraint DDL, and wiring the new "citus" type throughout config.py, storage/config.py, and storage/__init__.py. Unit tests cover initialization, config round-trips, and env var loading, but do not exercise the actual Citus schema/distribution logic.

Risk Assessment

Tier 2 is correct. The changes add a new storage backend without touching any Tier 3 files (base.py, executor.py, replay.py, tasks.py). The new backend inherits all query logic from the existing Postgres backend; only DDL and distribution differ.

Issues

  1. [warning] pyworkflow/storage/citus.py, lines 361–364: F-strings are used to build SQL with hardcoded table names in _distribute_tables:

    await conn.execute(
        f"SELECT create_distributed_table('{table}', 'run_id', "
        f"colocate_with => 'workflow_runs')"
    )

    The values come from a hardcoded list and there is no actual injection risk, but CLAUDE.md explicitly prohibits f-string SQL in storage backends. The same pattern applies to the create_reference_table call at line 374. These calls can use string formatting safely here, but the convention violation is worth noting.

  2. [warning] pyworkflow/storage/citus.py, lines 344–348: The pg_dist_partition query in _distribute_tables joins to pg_class without filtering by pg_namespace. In a database with multiple schemas containing tables of the same name (e.g., public.events and audit.events), this could incorrectly mark a table as already-distributed and skip Citus distribution for the intended table. Most deployments use only the default schema, so this is a low-probability issue, but it is a latent correctness bug in multi-schema environments.

  3. [suggestion] tests/unit/backends/test_citus_storage.py, lines 23–35: The mock_citus_backend fixture is defined but never used by any test function in the file. This is dead code — the fixture was presumably created to test _initialize_schema or _distribute_tables, but no such tests were written.

  4. [suggestion] pyworkflow/storage/citus.py, lines 149–325: The schema creation DDL in _initialize_schema (tables and indexes) runs inside an acquired connection but without an explicit transaction. A partial failure (e.g., after creating workflow_runs but before events) leaves the database in an inconsistent state. Subsequent re-runs recover via IF NOT EXISTS, but if table creation succeeds while index creation fails, the state is harder to reason about. This mirrors the parent class behavior so it is not a regression, but it is worth flagging.

  5. [suggestion] pyworkflow/storage/citus.py, line 40: from datetime import UTC, datetime is a local import inside apply_migration. Per project convention, imports should be at the top of the file. This should be moved to the module level.

Architecture

The changes comply with the dependency rules. CitusStorageBackend and CitusMigrationRunner live in pyworkflow/storage/, import only from pyworkflow.storage.postgres and pyworkflow.storage.migrations, and introduce no upward dependencies. The storage/config.py and storage/__init__.py additions follow the existing optional-import pattern used for all other optional backends.

Test Coverage

Tests cover initialization parameters, DSN handling, inheritance assertions, config serialization round-trips, and env var loading. These are adequate for the configuration-layer changes. However, the core Citus-specific logic — _initialize_schema, _distribute_tables, and CitusMigrationRunner.apply_migration — has no test coverage at all. The unused mock_citus_backend fixture suggests this was intended but not completed. For a storage backend that manages distributed schema creation, the absence of mocked tests for these methods is a gap worth addressing before the backend is used in production.


🤖 Code Review Agent — automated code review.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@github-actions
Copy link
Contributor

github-actions bot commented Mar 1, 2026

🔄 New commits pushed — requesting re-review.

Commit: ffcdf02d9b82
sha:ffcdf02d9b82e29576f6eb01197b333d77060a90

@github-actions
Copy link
Contributor

github-actions bot commented Mar 1, 2026

🔍 Code Review Agent — Tier 2

Commit: ffcdf02d9b82
Status: ✅ Complete
Verdict: 💬 COMMENT — Multiple warnings and suggestions identified but no blocking issues. Security and behavioral concerns should be mitigated with stronger documentation and test coverage, but no bugs causing runtime errors or immediate security vulnerabilities are present.

Now I have enough information to write a complete review.


Code Review: CitusStorageBackend

Summary

This PR adds a CitusStorageBackend that extends PostgresStorageBackend with Citus-compatible DDL — composite primary keys, removed cross-shard foreign keys, and downgraded unique constraints. It also adds a CitusMigrationRunner that skips the V2 unique-constraint step incompatible with Citus, and wires the new backend into the config serialization/deserialization pipeline and the env-var loader. The approach of inheriting all query logic and overriding only schema initialization is clean and appropriate.

Risk Assessment

Tier 2 — confirmed. Changes are confined to pyworkflow/storage/ (a new file, __init__.py, and config.py) and pyworkflow/config.py. No Tier 3 files are touched (base.py, executor.py, replay.py, tasks.py, pyproject.toml are all unmodified).

Issues

  1. Warningpyworkflow/storage/citus.py, lines 358–362: F-string SQL for table identifiers violates the project's explicit security rule ("never f-string SQL"). While the values are hardcoded constants from the colocated_run_id list and carry no actual injection risk, this sets a dangerous precedent if the pattern is copied and the source of table ever changes. PostgreSQL doesn't support parameterized identifiers, so the workaround would be a hardcoded string per table instead of looping with an f-string.

    await conn.execute(
        f"SELECT create_distributed_table('{table}', 'run_id', "
        f"colocate_with => 'workflow_runs')"
    )
  2. Warningpyworkflow/storage/citus.py, lines 199–202: The unique index on idempotency_key is silently downgraded to a non-unique index. The parent PostgresStorageBackend enforces CREATE UNIQUE INDEX idx_runs_idempotency_key, which is the database-level guarantee that prevents duplicate workflow runs when the caller provides the same idempotency key concurrently. Without it, two concurrent create_run calls with identical idempotency_key values will both succeed on a Citus cluster, producing duplicate runs that get_run_by_idempotency_key (postgres.py:540) will resolve arbitrarily. The comment acknowledges this but there is no compensating advisory lock or application-level guard documented. Operators migrating from PostgreSQL to Citus need an explicit warning that idempotency-key deduplication is application-layer only.

  3. Warningpyworkflow/storage/citus.py, lines 265–280: token TEXT UNIQUE NOT NULL in the parent (postgres.py:379) becomes token TEXT NOT NULL with a non-unique index. The comment "token = run_id:hook_id, collisions impossible" documents the assumption that token uniqueness is guaranteed by construction. If that invariant ever breaks (e.g., due to a bug in token generation), the database will not catch it and get_hook_by_token (postgres.py:986) will return an arbitrary row. This is at least worth a documented assertion or an integration-test guard.

  4. Suggestiontests/unit/backends/test_citus_storage.py, lines 23–35: The mock_citus_backend fixture is defined with a fully mocked pool and connection, but no test class actually injects or uses it. It is dead code. If the fixture was created for planned tests covering _initialize_schema, _distribute_tables, or the V2 migration logic, those tests are missing. If it will never be used, it should be removed.

  5. Suggestiontests/unit/backends/test_citus_storage.py (entire file): The Citus-specific DDL paths — _initialize_schema, _distribute_tables, and CitusMigrationRunner.apply_migration for version 2 — have zero test coverage. The unit test suite only covers config serialization and backend instantiation. Mock-based tests for the schema initialization path (using mock_citus_backend) would catch regressions in the distribution DDL without requiring a live Citus cluster.

Architecture

Compliant. citus.py imports only from pyworkflow.storage.migrations and pyworkflow.storage.postgres, staying within the storage layer. config.py and __init__.py use lazy imports to avoid hard dependencies on asyncpg. No imports from celery/, engine/, or core/ appear in any changed file.

Test Coverage

Adequate for config plumbing (serialization, env-var loading, cache behavior, round-trips). Insufficient for the core Citus-specific logic: schema initialization, table distribution, and the migration runner's Citus branch are entirely untested at the unit level.


🤖 Code Review Agent — automated code review.

@yasha-dev1 yasha-dev1 merged commit 2aed3e2 into main Mar 1, 2026
21 checks passed
@yasha-dev1 yasha-dev1 deleted the fix/add-support-for-citus-in-pg branch March 1, 2026 11:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant