Skip to content

OPS-001: Scheduling and automation #24

@AliiiBenn

Description

@AliiiBenn

OPS-001: Scheduling and Automation

Overview

Implement scheduling and automation capabilities for running the wareflow pipeline on a scheduled basis (daily, weekly, etc.).

Description

Scheduling and automation enables:

  • Automated daily pipeline execution
  • Scheduled report generation and distribution
  • Unattended operation (fire-and-forget)
  • Error notifications and alerts
  • Pipeline history and logging

Technical Approach

Built-in Scheduler

# src/wareflow_analysis/scheduler.py

import schedule
import time
from datetime import datetime
from pathlib import Path
from run.pipeline import PipelineRunner

class WareflowScheduler:
    """Schedule and automate wareflow pipeline execution."""

    def __init__(self, project_dir: Path, config_path: Path = None):
        self.project_dir = project_dir
        self.config_path = config_path or project_dir / "config" / "schedule.yaml"
        self.pipeline = PipelineRunner(project_dir)
        self.load_config()

    def load_config(self):
        """Load scheduling configuration."""
        import yaml
        if self.config_path.exists():
            with open(self.config_path) as f:
                self.config = yaml.safe_load(f)
        else:
            self.config = {"jobs": []}

    def run_pipeline(self, job_name: str):
        """Run pipeline with logging and error handling."""
        print(f"\n{'='*60}")
        print(f"Scheduled Job: {job_name}")
        print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"{'='*60}\n")

        try:
            results = self.pipeline.run_full_pipeline()

            # Log results
            self.log_results(job_name, results)

            # Send notifications if configured
            if self.config.get('notifications'):
                self.send_notifications(job_name, results)

        except Exception as e:
            print(f"❌ Pipeline failed: {e}")
            self.log_error(job_name, e)
            self.send_alert(job_name, e)

    def setup_schedules(self):
        """Setup scheduled jobs from configuration."""
        for job in self.config.get('jobs', []):
            if job.get('enabled', True):
                frequency = job['frequency']
                time_str = job.get('time', '02:00')

                if frequency == 'daily':
                    schedule.every().day.at(time_str).do(
                        self.run_pipeline, job['name']
                    )
                elif frequency == 'weekly':
                    day = job.get('day', 'monday')
                    getattr(schedule.every(), day.lower()).at(time_str).do(
                        self.run_pipeline, job['name']
                    )
                elif frequency == 'hourly':
                    schedule.every().hour.do(
                        self.run_pipeline, job['name']
                    )

    def start(self):
        """Start the scheduler."""
        print(f"🗓️  Wareflow Scheduler Started")
        print(f"Project: {self.project_dir}")
        print(f"Config: {self.config_path}")
        print(f"\nScheduled Jobs:")
        for job in self.config.get('jobs', []):
            if job.get('enabled', True):
                print(f"  ✓ {job['name']}: {job['frequency']} at {job.get('time', '02:00')}")
        print(f"\nScheduler running. Press Ctrl+C to stop.\n")

        self.setup_schedules()

        try:
            while True:
                schedule.run_pending()
                time.sleep(60)  # Check every minute
        except KeyboardInterrupt:
            print("\n\n🛑 Scheduler stopped by user")

    def log_results(self, job_name: str, results: dict):
        """Log pipeline execution results."""
        log_dir = self.project_dir / ".wareflow" / "logs"
        log_dir.mkdir(parents=True, exist_ok=True)

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        log_file = log_dir / f"pipeline_{timestamp}.log"

        with open(log_file, 'w') as f:
            f.write(f"Job: {job_name}\n")
            f.write(f"Timestamp: {datetime.now().isoformat()}\n")
            f.write(f"Status: {results['status']}\n")
            f.write(f"\nResults:\n")
            for step, result in results.items():
                if step not in ['status', 'errors']:
                    f.write(f"  {step}: {result}\n")
            if results.get('errors'):
                f.write(f"\nErrors:\n")
                for error in results['errors']:
                    f.write(f"  - {error}\n")

Configuration Structure

Schedule Configuration

config/schedule.yaml:

jobs:
  - name: "daily_pipeline"
    enabled: true
    frequency: "daily"
    time: "02:00"  # 2 AM
    warehouses:
      - paris
      - lyon
      - marseille
    steps:
      - import
      - analyze
      - export
    export:
      output_dir: "output/reports/daily"
      filename_pattern: "warehouse_report_{warehouse}_{date}.xlsx"
      consolidate: true

  - name: "weekly_summary"
    enabled: true
    frequency: "weekly"
    day: "friday"
    time: "08:00"
    warehouses: "all"
    steps:
      - analyze
    export:
      output_dir: "output/reports/weekly"
      filename_pattern: "weekly_summary_{date}.xlsx"
      consolidate: true
      email:
        to: ["execs@company.com"]
        subject: "Weekly Warehouse Summary"

notifications:
  on_success:
    email: ["warehouse-ops@company.com"]
  on_failure:
    email: ["warehouse-ops@company.com", "it-alerts@company.com"]
    slack:
      webhook_url: "${SLACK_WEBHOOK_URL}"
      channel: "#warehouse-alerts"

logging:
  level: "INFO"
  retention_days: 30

Implementation Plan

Phase 1: Core Scheduler (1-2 days)

  • Implement WareflowScheduler class
  • Load schedule configuration
  • Execute pipeline on schedule
  • Log results and errors

Phase 2: Enhanced Features (1 day)

  • Email notifications
  • Slack integration (webhooks)
  • Pipeline history tracking
  • Configuration validation

CLI Usage

# Start scheduler
wareflow scheduler start

# Run scheduled jobs manually (test)
wareflow scheduler run --job daily_pipeline

# Validate schedule configuration
wareflow scheduler validate

# Show next scheduled runs
wareflow scheduler next

# View schedule history
wareflow scheduler history

System Integration

Windows Task Scheduler

<!-- Task Scheduler XML -->
<Task>
  <Triggers>
    <CalendarTrigger>
      <StartAt 02:00:00</StartAt>
      <DaysInterval>1</DaysInterval>
    </CalendarTrigger>
  </Triggers>
  <Actions>
    <Exec>
      <Command>wareflow</Command>
      <Arguments>run</Arguments>
      <WorkingDirectory>C:\path\to\project</WorkingDirectory>
    </Exec>
  </Actions>
</Task>

Linux Cron

# /etc/cron.d/wareflow
# Run daily at 2 AM
0 2 * * * cd /path/to/project && wareflow run >> .wareflow/logs/cron.log 2>&1

# Run weekly report on Fridays at 8 AM
0 8 * * 5 cd /path/to/project && wareflow export --consolidated --output reports/weekly/

systemd Timer (Linux)

# /etc/systemd/system/wareflow.service
[Unit]
Description=Wareflow Analysis Pipeline
After=network.target

[Service]
Type=oneshot
WorkingDirectory=/path/to/project
ExecStart=/usr/local/bin/wareflow run

[Install]
WantedBy=multi-user.target
# /etc/systemd/system/wareflow.timer
[Unit]
Description=Run Wareflow Pipeline Daily
Requires=wareflow.service

[Timer]
OnCalendar=*-*-* 02:00:00
Persistent=true

[Install]
WantedBy=timers.target

Monitoring and Logging

Pipeline History

-- Track pipeline executions
CREATE TABLE pipeline_history (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_name TEXT,
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    status TEXT,  -- 'success', 'failed', 'partial'
    import_rows INTEGER,
    export_files TEXT,
    error_message TEXT
);

Log Files

.wareflow/
├── logs/
│   ├── pipeline_20250121_020000.log
│   ├── pipeline_20250120_020000.log
│   └── scheduler.log
└── history.json

Error Handling

Retry Logic

# config/schedule.yaml
jobs:
  - name: "daily_pipeline"
    retry:
      max_attempts: 3
      backoff: "exponential"  # or "linear"
      initial_delay: 60  # seconds

Alerting

def send_alert(job_name: str, error: Exception):
    """Send alert on pipeline failure."""
    if config['notifications'].get('slack'):
        send_slack_alert(
            webhook_url=config['notifications']['slack']['webhook_url'],
            channel=config['notifications']['slack']['channel'],
            message=f"❌ Pipeline {job_name} failed: {error}"
        )

    if config['notifications'].get('email'):
        send_email_alert(
            to=config['notifications']['on_failure']['email'],
            subject=f"⚠️ Pipeline Alert: {job_name}",
            body=f"Pipeline execution failed:\n\n{error}"
        )

Success Criteria

  • Scheduler runs pipeline on configured schedule
  • Supports daily, weekly, hourly frequencies
  • Logs all executions with timestamps
  • Sends notifications on success/failure
  • Validates configuration before starting
  • Handles errors gracefully with retries
  • Provides pipeline history

Future Enhancements

  • Web UI: Dashboard for viewing schedule and history
  • Dynamic Scheduling: Adjust schedules based on data availability
  • Distributed Execution: Run jobs across multiple servers
  • Priority Queues: High-priority ad-hoc jobs
  • Resource Limits: Prevent overlapping executions

Dependencies

Required

  • CORE-005 (run command)
  • schedule Python package (or use APScheduler)

Optional

  • SMTP server (for email notifications)
  • Slack webhook (for Slack alerts)

Related Issues

  • Depends on: CORE-005
  • Related to: ARCH-003 (Consolidated Reporting)

References

Notes

Scheduling and automation is the final piece for production deployment:

  • Enables fire-and-forget operation
  • Reduces manual intervention
  • Ensures timely reports
  • Provides operational monitoring

Key considerations:

  • Reliability: Must run consistently without manual intervention
  • Monitoring: Must detect and alert on failures
  • Idempotency: Must handle re-runs safely
  • Logging: Must maintain audit trail

The scheduler should be:

  • Simple: Easy to configure and understand
  • Robust: Handle errors gracefully
  • Visible: Clear status and logging
  • Flexible: Support different schedules per environment

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions