Skip to content

Migrate observability to OpenTelemetry and pipeline to LangGraph#88

Open
rafacm wants to merge 15 commits intomainfrom
feature/otel-langgraph-migration
Open

Migrate observability to OpenTelemetry and pipeline to LangGraph#88
rafacm wants to merge 15 commits intomainfrom
feature/otel-langgraph-migration

Conversation

@rafacm
Copy link
Copy Markdown
Owner

@rafacm rafacm commented Apr 13, 2026

Summary

  • OpenTelemetry: Replace Langfuse-specific instrumentation with OTel SDK. Traces export to any OTLP-compatible backend (Langfuse, Sentry, Jaeger). New RAGTIME_OTEL_* env vars replace RAGTIME_LANGFUSE_*.
  • LangGraph: Replace Django Q2 signal-based pipeline dispatch with a StateGraph that supports autonomous step skipping, recovery routing, and resume-from-failure. Remove django-q2 dependency entirely.
  • LangGraph Studio: Add langgraph.json and server module for local graph visualization via Studio desktop app.

Changes

Phase 1 — OpenTelemetry

  • Created episodes/telemetry.py (@trace_step, @trace_provider, record_llm_input/output)
  • Deleted episodes/observability.py (256 lines of Langfuse-specific code)
  • Updated all step files, providers, agents, settings, .env.sample, configure wizard

Phase 2 — LangGraph Pipeline

  • Created episodes/graph/ package (state, nodes, edges, pipeline, run — 7 new files)
  • 10-node graph: route → scrape → download → transcribe → summarize → chunk → extract → resolve → embed + recovery
  • Entry router inspects episode data for skip-already-done and resume-from-failure
  • Added handle_step_failure_from_graph() for graph-initiated recovery
  • Removed queue_next_step signal handler, Django Q2, Q_CLUSTER config
  • Updated admin to use threading.Thread + run_pipeline()

Phase 3 — LangGraph Studio

  • langgraph.json + episodes/graph/server.py for Studio integration

Documentation

  • Updated README.md, doc/README.md, CLAUDE.md for new architecture
  • Plan, feature doc, session transcripts, and changelog entry included

Test plan

  • uv run python manage.py test — all 211 tests pass (9 skipped for optional deps)
  • uv run python manage.py check — no issues
  • Run pipeline with RAGTIME_OTEL_EXPORTER=console and verify spans in terminal
  • langgraph dev starts and Studio shows the graph
  • Process an episode end-to-end through the graph

🤖 Generated with Claude Code

rafacm and others added 5 commits April 13, 2026 16:28
Create episodes/telemetry.py with OTel TracerProvider, @trace_step and
@trace_provider decorators, and record_llm_input/output span event
helpers. Replace all @observe_step/@observe_provider usage across step
files and providers. Replace Langfuse context propagation in recovery
agent with OTel spans. Replace Langfuse screenshot media attachments
with OTel span events. Replace RAGTIME_LANGFUSE_* env vars with
RAGTIME_OTEL_* (exporter, endpoint, service name, headers). Update
configure wizard for new OTel fields.

Delete episodes/observability.py (256 lines of Langfuse-specific code).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Create episodes/graph/ package with a LangGraph StateGraph that
orchestrates the ingestion pipeline. The graph has 10 nodes (route,
scrape, download, transcribe, summarize, chunk, extract, resolve,
embed, recovery) with conditional edges for step skipping and recovery
routing. An entry router inspects episode data to determine where to
start, enabling resume-from-failure.

Remove queue_next_step post_save signal handler and Django Q2
dependency. Replace async_task() in admin with threading.Thread +
run_pipeline(). Add handle_step_failure_from_graph() to recovery.py
for graph-initiated recovery. Remove Q_CLUSTER config and django_q
from INSTALLED_APPS.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove ~100 async_task patches across 13 test files — no longer needed
since Django Q2 signal dispatch was removed. Delete test_signals.py
(tested old queue_next_step handler) and test_observability.py (tested
Langfuse-specific code). Create test_telemetry.py for OTel telemetry
tests. Update test_configure.py for OTel fields. Fix test_scraper.py
and test_recovery.py to create ProcessingRun explicitly (previously
created by signal handler). Update test_admin.py to mock threading
instead of async_task.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add langgraph.json server config and episodes/graph/server.py (Django
setup for LangGraph server). Run `langgraph dev` to start the server
and connect via LangGraph Studio desktop app for graph visualization
and execution inspection.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Update README.md: replace Langfuse with OpenTelemetry, Django Q2 with
LangGraph in features, pipeline description, tech stack, and setup.
Update doc/README.md: rewrite pipeline orchestration, recovery handler
reference, and observability section (Langfuse → OTel with OTLP setup).
Update CLAUDE.md: replace Langfuse/Django Q2 with OTel/LangGraph.
Add changelog entry, plan doc, feature doc, and session transcripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@rafacm rafacm force-pushed the feature/otel-langgraph-migration branch from 8d175d6 to 2e69b36 Compare April 13, 2026 14:29
Add langgraph dev command to CLAUDE.md commands section. Update README
Getting Started to include langgraph dev as optional step and clarify
that the pipeline runs in-process via LangGraph (no separate worker).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR migrates RAGtime’s pipeline observability from Langfuse-specific instrumentation to OpenTelemetry, and replaces the Django Q2 + post_save signal-driven pipeline dispatch with a LangGraph StateGraph orchestration layer (including LangGraph Studio support).

Changes:

  • Replace Langfuse observability (episodes/observability.py) with an OpenTelemetry-based module (episodes/telemetry.py) and update step/provider instrumentation accordingly.
  • Replace Django Q2 pipeline dispatch (signals + async_task) with a LangGraph pipeline (episodes/graph/*) and update admin-triggered pipeline runs to execute via background threads.
  • Update configuration/docs/tests to reflect new OTel env vars (RAGTIME_OTEL_*) and new orchestration model (LangGraph).

Reviewed changes

Copilot reviewed 49 out of 51 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
uv.lock Updates resolved dependencies: removes django-q2/langfuse, adds langgraph + langchain-core deps and OTel libs.
pyproject.toml Removes django-q2, adds langgraph, opentelemetry-* (and OTLP exporter in observability extra).
ragtime/settings.py Removes django_q/Q_CLUSTER; replaces RAGTIME_LANGFUSE_* with RAGTIME_OTEL_*.
.env.sample Replaces Langfuse env vars with OpenTelemetry env vars.
core/management/commands/_configure_helpers.py Updates configure wizard sections/fields to OpenTelemetry settings.
core/tests/test_configure.py Updates wizard tests to match new OTel prompts/inputs.
episodes/telemetry.py New OTel tracing setup + decorators + span event helpers for LLM input/output.
episodes/observability.py Deleted Langfuse-specific observability implementation.
episodes/providers/openai.py Switches to plain openai.OpenAI and OTel span events/decorators.
episodes/scraper.py Replaces @observe_step with @trace_step.
episodes/downloader.py Not directly changed in diff shown, but now invoked by graph nodes instead of signals/Q2.
episodes/transcriber.py Replaces @observe_step with @trace_step.
episodes/summarizer.py Replaces @observe_step with @trace_step.
episodes/extractor.py Replaces @observe_step with @trace_step.
episodes/resolver.py Replaces @observe_step with @trace_step.
episodes/signals.py Removes post_save pipeline dispatch; keeps only step_completed/step_failed.
episodes/apps.py Stops auto-connecting recovery handler; calls telemetry setup on app ready.
episodes/recovery.py Adds handle_step_failure_from_graph() for graph-initiated recovery.
episodes/admin.py Replaces Q2 async_task usage with threading.Thread to run graph + recovery tasks.
episodes/agents/agent.py Reworks recovery agent tracing to use OTel spans and flush OTel provider.
episodes/agents/tools.py Records screenshots as OTel span events (replacing Langfuse media attachment).
episodes/graph/state.py Defines EpisodeState schema for LangGraph state passing.
episodes/graph/edges.py Entry routing + after-step conditional routing (including recovery routing).
episodes/graph/nodes.py Wraps existing step functions as graph nodes + adds embed/recovery nodes.
episodes/graph/pipeline.py Defines and compiles the ingestion StateGraph.
episodes/graph/run.py Adds run_pipeline() entry point that creates runs and invokes the compiled graph.
episodes/graph/server.py Ensures django.setup() is run before LangGraph Studio imports the graph.
langgraph.json Adds LangGraph Studio/server configuration pointing to the ingestion graph.
episodes/tests/test_telemetry.py New tests for OTel helper behaviors and calling conventions.
episodes/tests/test_observability.py Deleted Langfuse observability tests.
episodes/tests/test_signals.py Deleted tests for now-removed Q2/signal dispatch behavior.
episodes/tests/test_admin.py Updates admin tests to expect thread-based background execution.
episodes/tests/test_scraper.py Updates tests for new run creation assumptions and status-based behavior.
episodes/tests/test_transcribe.py Removes Q2 signal patching; calls step function directly.
episodes/tests/test_summarize.py Removes Q2 signal patching; calls step function directly.
episodes/tests/test_chunk.py Removes Q2 signal patching; calls step function directly.
episodes/tests/test_extract.py Removes Q2 signal patching; calls step function directly.
episodes/tests/test_resolve.py Removes Q2 signal patching; calls step function directly.
episodes/tests/test_download.py Removes Q2 signal patching; calls step function directly.
episodes/tests/test_events.py Removes Q2 signal patching; validates event emission directly.
episodes/tests/test_models.py Removes Q2 signal patching from model tests.
episodes/tests/test_recovery.py Updates recovery tests to manually connect signals where needed.
episodes/tests/test_agent_resume.py Removes Q2 patching assumptions around resume behaviors.
README.md Updates documentation to reference OpenTelemetry + LangGraph; removes qcluster instructions.
doc/README.md Updates pipeline + observability docs for LangGraph and OpenTelemetry.
CHANGELOG.md Adds a 2026-04-13 entry describing the migrations.
AGENTS.md Updates agent guidance to reflect LangGraph + OpenTelemetry architecture.
doc/plans/2026-04-13-otel-langgraph-migration.md Adds migration plan documentation.
doc/features/2026-04-13-otel-langgraph-migration.md Adds feature documentation for the migration.
doc/sessions/2026-04-13-otel-langgraph-migration-implementation-session.md Adds implementation session transcript.
doc/sessions/2026-04-13-otel-langgraph-migration-planning-session.md Adds planning session transcript.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

rafacm and others added 4 commits April 13, 2026 16:43
The langgraph CLI (langgraph dev) is a separate package from the
langgraph library. Add langgraph-cli[inmem] as a studio optional
dependency group. Update all docs to use `uv run langgraph dev`
and reference `uv sync --extra studio` for installation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The langgraph CLI requires a 'dependencies' key in the config to know
what to install. Use ["."] to reference the current project, which
brings in all dependencies from pyproject.toml.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Point users to http://localhost:8000/admin/ after starting the server.
Note that LangGraph Studio works locally without a LangSmith API key
and the warning banner can be dismissed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1. Add start_from param to run_pipeline/route_entry so admin reprocess
   overrides data-based routing instead of silently skipping steps
2. Add _close_connections() for DB connection management in background
   threads (gated on non-main thread to avoid breaking tests)
3. Route ALL step failures to recovery node (not just scraping/downloading)
   so RecoveryAttempt records and human escalation work for every step
4. Doc now accurate after fix #3
5. Preserve original error_type/http_status/exception_class from
   PipelineEvent using dataclasses.replace() in handle_step_failure_from_graph
6. Fix telemetry.py module docstring re: OTel import when disabled
7. embed_node() fails fast instead of marking READY without embeddings
8. Add test_graph.py with 12 tests for route_entry, after_step,
   after_recovery, and compiled graph integration
9. Add comment to apps.py explaining signals kept as extension points
10. Fix _build_exporter() docstring

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@rafacm rafacm requested a review from Copilot April 14, 2026 09:40
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@rafacm
Copy link
Copy Markdown
Owner Author

rafacm commented Apr 14, 2026

High: this change appears to remove the primary ingestion trigger when a new episode is created.

episodes/signals.py now only defines step_completed / step_failed and no longer contains the old post_save-driven queue_next_step hook. I also don’t see a replacement on the normal add path in EpisodeAdmin — the only new pipeline entry point I found is the reprocess action’s background thread in episodes/admin.py:306-316.

Impact: creating an Episode from the admin now looks like it will leave the row in pending indefinitely instead of starting the pipeline. That regresses the documented workflow in README.md:134-136 ("submit episode URLs, monitor pipeline progress...").

The focused Django test slice passed, but this behavior is no longer covered because episodes/tests/test_signals.py was deleted in this PR, so there is currently no test protecting automatic ingestion on create.

@rafacm
Copy link
Copy Markdown
Owner Author

rafacm commented Apr 14, 2026

Medium: admin-triggered pipeline/recovery work is now launched via raw daemon threads, which makes the "queued" behavior non-durable.

The reprocess action starts threading.Thread(..., daemon=True) in episodes/admin.py:312-316, and recovery retry does the same in episodes/admin.py:686-690.

Impact: if the Django process reloads, exits, or the worker is recycled after the admin response returns, daemon threads can be terminated immediately and the background work is silently lost. That is a reliability regression from the previous task-queue model, because the UI reports success (Queued ...) without any persistence or retry semantics.

The current tests only mock threading.Thread, so they verify that a thread is started, not that the work survives process restarts.

rafacm and others added 2 commits April 14, 2026 16:16
Generated by `langgraph dev` and should not be tracked.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add save_model() to EpisodeAdmin to auto-start the ingestion pipeline
  when a new episode is created, restoring the behavior removed with the
  Django Q2 to LangGraph migration.
- Remove daemon=True from background threads (reprocess, recovery retry,
  and new auto-start) so work completes during graceful process shutdown
  instead of being silently killed.
- Add test for auto-ingestion on episode create.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@rafacm
Copy link
Copy Markdown
Owner Author

rafacm commented Apr 14, 2026

Fixed in 9134c90.

  1. Auto-ingestion restored — added save_model() override to EpisodeAdmin that starts the pipeline in a background thread when a new episode is created (change=False). This restores the auto-start behavior that was lost when the post_save signal handler was removed during the Django Q2 → LangGraph migration.

  2. Test addedtest_create_episode_auto_starts_pipeline verifies that creating an episode via the admin triggers _run_pipeline_task.

@rafacm
Copy link
Copy Markdown
Owner Author

rafacm commented Apr 14, 2026

Fixed in 9134c90.

Removed daemon=True from all three threading.Thread usages (reprocess, recovery retry, and the new auto-start on create). Non-daemon threads block process exit until they complete, so work in progress survives graceful shutdown (e.g., runserver reload, gunicorn graceful restart) instead of being silently killed.

This doesn't provide full persistence across hard kills or crashes — that would require re-introducing a task queue. But it eliminates the most common failure mode (work lost on reload) while keeping the implementation simple.

rafacm and others added 2 commits April 14, 2026 16:52
Extract Jaeger setup from inline step to its own subsection with
Docker command and .env config. Add comprehensive Langfuse subsection
covering self-hosted setup (with port conflict tip), API key generation,
base64 auth encoding, and Langfuse Cloud regional endpoints.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove RAGTIME_OTEL_ENABLED flag — tracing now activates automatically
when RAGTIME_OTEL_EXPORTER is set to 'otlp' or 'console'. When unset
or 'none', the OTel API provides a no-op tracer with zero overhead.

Move opentelemetry-exporter-otlp-proto-http from optional to main
dependencies and remove the 'observability' extras group entirely.
For a RAG app chaining multiple LLM calls, observability is essential
infrastructure, not an optional add-on.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@rafacm
Copy link
Copy Markdown
Owner Author

rafacm commented Apr 14, 2026

High: the auto-start-on-create fix still has a transaction race.

In episodes/admin.py:182-189, save_model() starts a background thread immediately after super().save_model(...):

threading.Thread(
    target=_run_pipeline_task,
    args=(obj.pk,),
).start()

This thread uses a separate DB connection, so it can run before the admin request's transaction commits and fail to see the newly created Episode row at all. In that case _run_pipeline_task() / run_pipeline() can raise Episode.DoesNotExist, and the new episode is left stuck without ingestion starting.

The safe pattern here is to schedule the thread from transaction.on_commit(...) so the worker only starts after the row is committed and visible to other connections.

The new test does not catch this because it mocks threading.Thread rather than exercising the real post-commit behavior.

Use transaction.on_commit() to defer the background thread until the
Episode row is committed and visible to other DB connections. Without
this, the worker thread could run before the admin request's transaction
commits and fail with Episode.DoesNotExist.

Update test to use captureOnCommitCallbacks(execute=True) to verify
the on_commit callback fires correctly.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@rafacm
Copy link
Copy Markdown
Owner Author

rafacm commented Apr 14, 2026

Fixed in b16c617.

Wrapped the background thread start in transaction.on_commit() so the worker only starts after the Episode row is committed and visible to other DB connections. Updated the test to use captureOnCommitCallbacks(execute=True) to verify the on_commit behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants