Context
When integrating dotflow into a queued / job-based system (SQS → Lambda, Celery, etc.), we frequently re-trigger the same workflow with the same workflow_id but want different idempotency semantics depending on whether the input (initial_context) changed:
- Same input, retry/resume — dotflow already handles this beautifully via
resume=True + persistent Storage provider. Steps with stored checkpoints are skipped, the workflow continues where it stopped.
- Different input, restart — currently no built-in primitive. The previous run's checkpoints in
Storage will silently be picked up by resume=True, which is wrong: the new input is ignored and the old parse/extract output is used.
What we want
A native way for DotFlow / Manager to detect that initial_context differs from a previous run and either:
- Throw / refuse to start (strict),
- Invalidate prior checkpoints automatically (overwrite), or
- Expose a hook (
on_input_change=... callback) for the user to decide.
Today the workaround is to:
- Hash the input payload externally
- Use
workflow_id = f\"{job}-{input_hash}\" so each input gets its own checkpoint namespace
- Manually clean up old prefixes
This works but is boilerplate every consumer reinvents.
Proposed API sketch
workflow = DotFlow(workflow_id=\"my-job\", config=config)
workflow.task.add(step=[parse, transform, load], initial_context=payload)
workflow.start(
resume=True,
on_input_change=\"reset\", # \"reset\" | \"reuse\" | \"raise\"
)
Or more granular:
workflow.start(
resume=True,
fingerprint=hash_of(payload), # lib stores it next to checkpoints,
# invalidates if mismatch
)
Why it matters
Queued/job-based deployments are dotflow's sweet spot (Lambda + SQS, ECS, scheduled runs). Right now combining resume=True with any system where the input might change is footgun-prone unless you read the source carefully.
A documented + built-in idempotency story would prevent silently-wrong runs.
Related
docs_src/checkpoint/checkpoint.py shows happy path (same input, resume)
- No example shows what happens when the same workflow_id gets a different initial_context
Context
When integrating dotflow into a queued / job-based system (SQS → Lambda, Celery, etc.), we frequently re-trigger the same workflow with the same
workflow_idbut want different idempotency semantics depending on whether the input (initial_context) changed:resume=True+ persistentStorageprovider. Steps with stored checkpoints are skipped, the workflow continues where it stopped.Storagewill silently be picked up byresume=True, which is wrong: the new input is ignored and the oldparse/extractoutput is used.What we want
A native way for
DotFlow/Managerto detect thatinitial_contextdiffers from a previous run and either:on_input_change=...callback) for the user to decide.Today the workaround is to:
workflow_id = f\"{job}-{input_hash}\"so each input gets its own checkpoint namespaceThis works but is boilerplate every consumer reinvents.
Proposed API sketch
Or more granular:
Why it matters
Queued/job-based deployments are dotflow's sweet spot (Lambda + SQS, ECS, scheduled runs). Right now combining
resume=Truewith any system where the input might change is footgun-prone unless you read the source carefully.A documented + built-in idempotency story would prevent silently-wrong runs.
Related
docs_src/checkpoint/checkpoint.pyshows happy path (same input, resume)