Skip to content

feat: Implement Celery Runtime with CLI Worker Commands #2

@yasha-dev1

Description

@yasha-dev1

Summary

Implement a Celery runtime that integrates with the CLI, allowing workflows and steps to be scheduled to Celery workers.

Background

The project already has:

  • Celery task definitions in pyworkflow/celery/tasks.py (execute_step_task, start_workflow_task, resume_workflow_task)
  • Runtime abstraction layer in pyworkflow/runtime/ with LocalRuntime
  • CLI structure with command groups in pyworkflow/cli/

Proposed Changes

1. New CLI Command Group: worker

# Start a Celery worker
pyworkflow worker run [OPTIONS]

Options:
  --workflow          Only process workflow tasks (pyworkflow.workflows queue)
  --step              Only process step tasks (pyworkflow.steps queue)  
  --schedule          Only process schedule tasks (pyworkflow.schedules queue)
  --all               Process all queues (default)
  --concurrency N     Number of worker processes (default: auto)
  --loglevel LEVEL    Log level (debug, info, warning, error)

# Show worker status
pyworkflow worker status

# List active workers
pyworkflow worker list

2. New CLI Command: setup

Setup/validate the environment for Celery execution:

pyworkflow setup [OPTIONS]

Options:
  --broker TYPE       Broker type: redis, rabbitmq (default: redis)
  --check             Only check if environment is ready (no modifications)

This command will:

  • Verify broker connectivity
  • Display connection info
  • Optionally create required queues

3. CeleryRuntime Implementation

Create pyworkflow/runtime/celery.py:

class CeleryRuntime(Runtime):
    supports_durable = True
    supports_transient = False
    
    async def start_workflow(...):
        # Dispatch to Celery's start_workflow_task
        
    async def resume_workflow(...):
        # Dispatch to Celery's resume_workflow_task
        
    async def schedule_wake(...):
        # Schedule via Celery Beat or apply_async with countdown

4. Broker Configuration

Expose broker configuration in pyworkflow settings:

# pyworkflow.toml
[celery]
broker = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"

# Or RabbitMQ
[celery]
broker = "amqp://guest:guest@localhost:5672//"
result_backend = "rpc://"

Environment variables:

  • PYWORKFLOW_CELERY_BROKER
  • PYWORKFLOW_CELERY_RESULT_BACKEND

5. Integration with workflows run

When Celery runtime is selected:

# Use Celery runtime (dispatches to workers)
pyworkflow --runtime celery workflows run my_workflow --arg foo=bar

# Use local runtime (runs in-process)
pyworkflow --runtime local workflows run my_workflow --arg foo=bar

Implementation Tasks

  • Create pyworkflow/runtime/celery.py with CeleryRuntime class
  • Register CeleryRuntime in runtime factory
  • Create pyworkflow/cli/commands/worker.py with worker command group
  • Create pyworkflow/cli/commands/setup.py with setup command
  • Add --runtime global option to CLI
  • Update configuration to support celery broker settings
  • Add environment variable support for broker URLs
  • Update CLI documentation
  • Add tests for CeleryRuntime
  • Add integration tests for worker commands

Expected Workflow

# 1. Setup environment (optional, for validation)
pyworkflow setup --broker redis --check

# 2. Start workers in separate terminals
pyworkflow worker run --workflow    # Terminal 1: workflow orchestration
pyworkflow worker run --step        # Terminal 2: step execution
pyworkflow worker run --schedule    # Terminal 3: scheduled tasks

# Or start all-in-one worker
pyworkflow worker run --all

# 3. Run workflows (dispatched to workers)
pyworkflow --runtime celery workflows run my_workflow --arg user_id=123

# 4. Monitor
pyworkflow runs list
pyworkflow runs status run_abc123

Labels

enhancement, cli, celery

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions