Skip to content

feat: initial implementation and introduction of stream workflows and stream steps#259

Open
yasha-dev1 wants to merge 1 commit intomainfrom
feat/signals-and-stream-workflows
Open

feat: initial implementation and introduction of stream workflows and stream steps#259
yasha-dev1 wants to merge 1 commit intomainfrom
feat/signals-and-stream-workflows

Conversation

@yasha-dev1
Copy link
Collaborator

New Feature of Stream Workflows and stream steps

@yasha-dev1 yasha-dev1 deployed to tier3-approval March 7, 2026 12:25 — with GitHub Actions Active
@github-actions
Copy link
Contributor

github-actions bot commented Mar 7, 2026

🔍 Code Review Agent — Tier 3

Commit: c650973ce6fa
Status: ✅ Complete
Verdict: 🔄 REQUEST_CHANGES — Multiple blocking issues found: (1) consumer silently drops signals on non-memory backends causing data loss, (2) fallback step lookup can dispatch to wrong handler causing runtime errors, and (3) dynamic attribute assignment risks corrupting replay on __slots__-enabled classes.

Now I have enough information to write the review. Let me produce the code review:


Summary

This PR introduces a new pyworkflow/streams/ module implementing a pub/sub signal pattern for PyWorkflow. It adds stream workflows (@stream_workflow), reactive stream steps (@stream_step), a signal dispatch pipeline (emit()dispatch_signal()on_signal() callback), a checkpoint backend abstraction, and a background StreamConsumer. Corresponding changes to StorageBackend (as optional, non-abstract extension methods), InMemoryStorageBackend, EventType/Event, and replay.py support the new system. Test coverage covers unit-level behavior of individual modules plus integration-level lifecycle tests.


Risk Assessment

Tier 3 — confirmed. Both pyworkflow/engine/replay.py and pyworkflow/storage/base.py are modified directly. The stream module is new, but it is wired into the core event system and replay engine.


Issues

Blocking

1. consumer.py:76–84 — Consumer silently no-ops on all non-memory backends

_process_pending_signals discovers streams and subscriptions by directly reading private attributes on the storage object:

streams = getattr(self._storage, "_streams", {})
...
if hasattr(self._storage, "_subscriptions"):
    for (sid, _), sub in self._storage._subscriptions.items():

getattr(..., "_streams", {}) returns {} for every storage backend except InMemoryStorageBackend. On Redis, SQLite, PostgreSQL, or Citus backends the consumer loop runs forever but never processes any signals — no error, no log, no indication anything is wrong. The public API (get_waiting_steps, get_pending_signals) already exists on StorageBackend and should be used instead.

2. dispatcher.py:229–232 — Fallback in _find_step_metadata_for_run dispatches to wrong step

The primary lookup (name substring match) at line 227 already has ambiguity risk (e.g., step named "worker" matches "super_worker_abc123"). The fallback at lines 229–232 compounds this by returning any step registered on the stream, regardless of whether it handles the signal type:

# Fallback: return the first step on this stream that subscribes to this signal
for step_meta in all_steps.values():
    if step_meta.stream == stream_id:
        return step_meta

The comment says "subscribes to this signal" but the code does not filter by signal_type in step_meta.signal_types. If Stream A has two steps — one for "task.created" and one for "result.ready" — the fallback will invoke the wrong step's on_signal callback with a signal it does not handle.


Warnings

3. dispatcher.py:206–213 — Empty stream_id and step_name recorded in cancellation events

_cancel_step calls create_stream_step_completed_event with hardcoded empty strings:

event = create_stream_step_completed_event(
    run_id=step_run_id,
    stream_id="",  # Will be filled from context
    step_name="",
    ...
)

There is no code that actually fills these from context. Every cancellation event will be persisted with stream_id="" and step_name="", making the event log useless for auditing cancelled stream steps.

4. dispatcher.py:141–143 — Bare except Exception: pass swallows storage errors silently

except Exception:
    pass  # Best-effort event logging

This violates CLAUDE.md's explicit rule: never swallow exceptions silently. A storage failure (connection loss, disk full, etc.) during record_event will be hidden. At minimum this should logger.warning(...) the exception so operators can detect storage degradation.

5. consumer.py:124poll_once() always returns 0

The public signature and docstring say the function returns the number of signals processed. The implementation always returns 0:

return 0  # Consumer doesn't track count currently

Callers that use the return value for monitoring or test assertions will get misleading results.

6. replay.py:270,291 — Dynamic attribute assignment on LocalContext

Signal wait and received-signal state is stored as dynamically-added attributes:

if not hasattr(ctx, "_signal_waits"):
    ctx._signal_waits = {}

If LocalContext declares __slots__, this will raise AttributeError at runtime during replay — silently corrupting workflow state replay (a Tier 3 critical path). Even without __slots__, relying on hasattr guards for state initialization is fragile and bypasses any type checking.


Architecture

The streams module is a new top-level package that is not listed in harness.config.json's architecturalBoundaries. Its boundary policy is therefore undefined and unenforced by CI.

Within the existing constraints, dispatcher.py makes deferred imports from pyworkflow.engine.events, pyworkflow.serialization.encoder, pyworkflow.config, and pyworkflow.runtime. These imports go upward from a peer module into the core runtime — since streams' allowed imports are not yet codified, these need to be explicitly documented and added to the harness config to prevent drift.

No import from celery/ inside core/ or engine/ was observed — the existing coreenginecelery rule is not violated.


Test Coverage

Unit tests cover Signal/Stream creation, emit sequencing, all storage CRUD operations (publish, subscribe, acknowledge, checkpoint), and decorator/registry behavior. Integration tests cover the on_signal lifecycle, ctx.resume()/ctx.cancel(), multi-signal dispatch filtering, Pydantic schema validation, and checkpoint save/load within context.

Gaps:


🤖 Code Review Agent — automated code review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant