Skip to content

Conversation

@mingjerli
Copy link
Owner

Summary

  • Add to_dagster_assets() method to generate Dagster assets from SQL pipeline
  • Add to_dagster_job() method for ops-based Dagster workflows
  • Improve Airflow 3.x compatibility with auto-version detection

Changes

  • to_dagster_assets(): Convert pipeline to Dagster assets with automatic dependency wiring, group names, key prefixes, and metadata support
  • to_dagster_job(): Alternative approach for job-based Dagster workflows
  • Airflow version detection: Auto-detect installed Airflow version (2.x or 3.x)
  • Test suite: 16 comprehensive tests for Dagster integration

Test plan

  • All existing tests pass (698 passed, 11 skipped)
  • Dagster integration tests pass (2 passed when dagster installed, 14 skipped otherwise)
  • Linter passes (ruff check)
  • Tested with enterprise-demo Docker setup

🤖 Generated with Claude Code

mingjerli and others added 2 commits January 11, 2026 18:35
Add support for Dagster as an orchestration target:

- to_dagster_assets(): Generate Dagster assets from pipeline
  - One asset per target table with automatic dependency wiring
  - Support for group names, key prefixes, compute kinds
  - Asset metadata with query_id and table name

- to_dagster_job(): Generate Dagster job from pipeline (ops-based)
  - Alternative approach for job-based workflows

Also improves Airflow 3.x compatibility:
- Auto-detect Airflow version when generating DAGs
- Support explicit version specification via airflow_version parameter

Includes comprehensive test suite (tests/test_dagster_integration.py)
with 16 tests covering asset generation, dependencies, and execution.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Break down large pipeline.py (~2700 lines) using delegation pattern:

- Create orchestrators/ package with:
  - base.py: BaseOrchestrator with shared functionality
  - airflow.py: AirflowOrchestrator for Airflow DAG generation
  - dagster.py: DagsterOrchestrator for Dagster assets/jobs

- Create execution.py with:
  - PipelineExecutor for sync/async pipeline execution

- Update pipeline.py to delegate to new modules:
  - to_airflow_dag() → AirflowOrchestrator.to_dag()
  - to_dagster_assets() → DagsterOrchestrator.to_assets()
  - to_dagster_job() → DagsterOrchestrator.to_job()
  - run() → PipelineExecutor.run()
  - async_run() → PipelineExecutor.async_run()

- Export new classes from clgraph.__init__:
  - AirflowOrchestrator, DagsterOrchestrator, PipelineExecutor

Benefits:
- Cleaner separation of concerns
- Easier to test orchestrators independently
- Reduced pipeline.py size (~400 lines removed)
- Backward compatible - Pipeline methods still work

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@mingjerli mingjerli merged commit 469b643 into main Jan 12, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants