Skip to content

feat: Implement RQ-based PyMuPDF integration for async PDF processing#135

Merged
JonnyTran merged 14 commits into
feat/ocr-rq-jobs-workflowfrom
feat/rq-pymupdf-integration
Aug 20, 2025
Merged

feat: Implement RQ-based PyMuPDF integration for async PDF processing#135
JonnyTran merged 14 commits into
feat/ocr-rq-jobs-workflowfrom
feat/rq-pymupdf-integration

Conversation

@priyankeshh
Copy link
Copy Markdown
Contributor

feat: Implement RQ-based PyMuPDF integration for async PDF processing

🎯 Overview

This PR implements Phase 2 of the RQ-PyMuPDF integration plan, adding direct Redis Queue (RQ) communication between extralit-server and extralit-hf-space workers for asynchronous PDF processing with AGPL compliance.

🏗️ Architecture Changes

Before (HTTP-based)

extralit-server → HTTP → extralit-hf-space FastAPI → PyMuPDF

After (RQ-based)

extralit-server → RQ (pdf_queue) → extralit-hf-space worker → PyMuPDF

✨ Key Features

  • 🚀 Async Processing: Non-blocking PDF extraction using Redis queues
  • ⚖️ AGPL Compliance: Strict separation of Apache 2.0 and AGPL code via RQ workers
  • 🔄 Fallback Mechanism: HTTP fallback when Redis is unavailable
  • 📊 Smart Policies: File size limits and extraction configuration
  • 🎛️ Configurable: Environment-driven RQ enable/disable

📁 Files Added

Core RQ Integration

  • src/extralit_server/contexts/ocr/rq_client.py - Direct RQ client for pdf_queue
  • src/extralit_server/jobs/pdf_extraction_jobs.py - PDF extraction orchestration with RQ polling

Enhanced Pipeline

  • Updated src/extralit_server/jobs/document_jobs.py - Integrated RQ extraction into document processing
  • Updated pyproject.toml - Added Redis dependency

🔧 Configuration

Environment Variables

# Enable/disable RQ processing
PYMUPDF_RQ_ENABLED=true
PYMUPDF_RQ_FALLBACK_HTTP=true

# Redis connection
REDIS_URL=redis://localhost:6379/0
PYMUPDF_EXTRACTION_QUEUE=pdf_queue

# Fallback HTTP service
PYMUPDF_SERVICE_URL=http://localhost:7860

🧪 Testing

Quick Test

# Start Redis
docker run -d -p 6379:6379 redis:alpine

# Start extralit-hf-space worker (separate repo)
cd extralit-hf-space
git checkout feat/rq-pymupdf-integration
python -m src.worker &

# Test RQ integration
cd extralit-server
python -c "
from src.extralit_server.contexts.ocr.rq_client import is_redis_available
print(f'Redis available: {is_redis_available()}')
"

Integration Testing

The RQ client includes comprehensive testing utilities for:

  • ✅ Redis connectivity validation
  • ✅ Job enqueueing and status monitoring
  • ✅ HTTP fallback mechanism
  • ✅ Error handling and timeout scenarios

🔄 Backward Compatibility

  • ✅ Existing workflows unchanged - HTTP extraction still available as fallback
  • ✅ Configuration-driven - RQ can be disabled via environment variables
  • ✅ Graceful degradation - Automatic fallback to HTTP when Redis unavailable

📋 Implementation Details

Direct RQ Communication

  • Uses string job reference: "src.jobs.extraction_jobs.extract_pdf_markdown_job"
  • Connects to same pdf_queue as extralit-hf-space worker
  • No HTTP dependency in RQ workflow (pure Redis communication)

Smart Extraction Policies

def should_extract_text(filename: str, file_metadata: dict) -> bool:
    """Smart extraction policies based on file type and size."""
    if not filename.lower().endswith('.pdf'):
        return False
    
    file_size = file_metadata.get('size', 0)
    max_size = 50 * 1024 * 1024  # 50MB limit
    return file_size <= max_size

Error Handling

  • Timeout handling: Configurable job timeouts with polling
  • Redis failures: Automatic HTTP fallback
  • Job failures: Detailed error reporting and retry logic

🔗 Related Changes

This PR works in conjunction with:

  • extralit-hf-space feat/rq-pymupdf-integration branch - Pure RQ worker implementation
  • Maintains compatibility with existing PyMuPDF HTTP service

Branch: feat/rq-pymupdf-integration
Type: Feature
Breaking Changes: None
Dependencies: Redis server, extralit-hf-space RQ worker

@priyankeshh priyankeshh changed the title Feat/rq pymupdf integration feat: Implement RQ-based PyMuPDF integration for async PDF processing Aug 17, 2025
Comment on lines +38 to +43
def get_redis_connection():
"""Get or create Redis connection."""
global _redis
if _redis is None:
_redis = redis.from_url(REDIS_URL)
return _redis
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can import from extralit_server.jobs.queues import REDIS_CONNECTION

from extralit_server.settings import settings

if settings.redis_use_cluster:
    REDIS_CONNECTION = RedisCluster.from_url(settings.redis_url)
else:
    REDIS_CONNECTION = redis.from_url(settings.redis_url)

Comment thread extralit-server/pyproject.toml Outdated
Comment thread extralit-server/src/extralit_server/api/handlers/v1/jobs.py
Copy link
Copy Markdown
Member

@JonnyTran JonnyTran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comments

- Fix trailing whitespace and formatting issues
- Apply ruff formatting to RQ client and text modules
- Ensure code follows project style guidelines
- Fix trailing whitespace and code formatting issues
- Apply ruff formatting to pdf_extraction_jobs.py and document_jobs.py
- Ensure code follows project style guidelines
@priyankeshh priyankeshh force-pushed the feat/rq-pymupdf-integration branch from c4bc4db to 144c517 Compare August 18, 2025 20:19
@priyankeshh priyankeshh changed the base branch from develop to feat/ocr-rq-jobs-workflow August 18, 2025 20:25
Comment thread extralit-server/src/extralit_server/api/handlers/v1/jobs.py Outdated
Comment thread extralit-server/src/extralit_server/contexts/ocr/text.py Outdated
…DF workflow

- Updated `pyproject.toml` to use `rq` version 2.4.1.
- Removed the `rq_client.py` file and its associated functions to streamline job management.
- Adjusted job handling in `jobs.py` and `pdf.py` to reflect the removal of the RQ client, ensuring proper job enqueueing and dependency management.
- Cleaned up unused imports and improved type handling in `text.py`.
Comment thread extralit-server/src/extralit_server/jobs/pdf_extraction_jobs.py Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this file, since it's already being called in start_pdf_workflow

- Deleted the `pdf_extraction_jobs.py` file to simplify job orchestration.
- Removed `PDF_QUEUE` from the job queues, transitioning to `DEFAULT_QUEUE` for PDF extraction tasks.
- Updated `text.py` to eliminate unused functions and improve code clarity.
- Adjusted the PDF workflow to reflect changes in job handling and ensure proper integration with the new structure.
@JonnyTran JonnyTran marked this pull request as ready for review August 20, 2025 06:43
@JonnyTran JonnyTran requested a review from a team as a code owner August 20, 2025 06:43
@JonnyTran JonnyTran merged commit f7ab092 into feat/ocr-rq-jobs-workflow Aug 20, 2025
16 checks passed
@JonnyTran
Copy link
Copy Markdown
Member

This pull request introduces a new job queue specifically for PDF OCR processing and updates the PDF workflow to utilize this new queue for text extraction jobs. The main goal is to better organize and route jobs related to PDF OCR, separating them from the default queue, and to prepare for future workflow enhancements.

Job queue improvements:

  • Added a new PDF_OCR_QUEUE to queues.py for handling PDF OCR-related jobs, ensuring these tasks are managed separately from other job types.

Workflow enhancements:

  • Updated the PDF workflow in pdf.py to enqueue PyMuPDF-based text extraction jobs to the new PDF_OCR_QUEUE, with dependencies and job configuration, instead of the default queue. [1] [2]
  • Modified the workflow logic to store the new text extraction job ID in the workflow record, ensuring traceability and easier management of job dependencies.

JonnyTran added a commit that referenced this pull request Aug 22, 2025
* design

* Update rq dependency to version 2.4.1 and adjust pdm.lock accordingly

* design

* design v2

* design v3

* design v3

* design v4

* design v5

* design v5

* 1.1 Create combined PDF processing job function

* Refactor PDF processing job function for improved efficiency

* 1.5 Update process_bulk_upload function

* refactor

* task 2.4: Add workflow status monitoring

* fix AsyncSessionLocal

* Refactor database migration and PDF workflow to enhance structure and clarity

* Refactor PDF workflow functions to use workspace name instead of ID and improve document handling for uploads without associated files

* fix

* Update metadata fields to be optional and improve error handling in PDF analysis

* fix REDIS_CONNECTION arg

* refactor

* latest

* Update metadata fields in Analysis, Preprocessing, Text Extraction, Table Extraction, and Embedding classes to be optional strings, enhancing flexibility in handling completion timestamps.

* feat: Implement RQ-based PyMuPDF integration for async PDF processing (#135)

* fix: apply code formatting and linting fixes

- Fix trailing whitespace and formatting issues
- Apply ruff formatting to RQ client and text modules
- Ensure code follows project style guidelines

* fix: apply formatting and linting to PDF extraction pipeline

- Fix trailing whitespace and code formatting issues
- Apply ruff formatting to pdf_extraction_jobs.py and document_jobs.py
- Ensure code follows project style guidelines

* fix: apply final ruff formatting fixes

* fix: update default extraction queue name to 'pdf_queue'

* refactor: remove unused PDF extraction logic from document upload job

* addedpdf extraction

* feat: add PDF_QUEUE for PyMuPDF extraction jobs

* feat: integrate PyMuPDF extraction job into PDF workflow

* fix: add missing Optional import in jobs.py and add queue test

* test: organize all test scripts into tests/ folder with comprehensive runners

* refactor: update RQ client integration and clean up job handling in PDF workflow

- Updated `pyproject.toml` to use `rq` version 2.4.1.
- Removed the `rq_client.py` file and its associated functions to streamline job management.
- Adjusted job handling in `jobs.py` and `pdf.py` to reflect the removal of the RQ client, ensuring proper job enqueueing and dependency management.
- Cleaned up unused imports and improved type handling in `text.py`.

* refactor: remove PDF_QUEUE and streamline PDF extraction workflow

- Deleted the `pdf_extraction_jobs.py` file to simplify job orchestration.
- Removed `PDF_QUEUE` from the job queues, transitioning to `DEFAULT_QUEUE` for PDF extraction tasks.
- Updated `text.py` to eliminate unused functions and improve code clarity.
- Adjusted the PDF workflow to reflect changes in job handling and ensure proper integration with the new structure.

* renamed PDF_QUEUE to PDF_OCR_QUEUE

---------

Co-authored-by: JonnyTran <nhat.c.tran@gmail.com>

* feat: Add OCR_QUEUE for improved job handling in PDF workflows

- Introduced OCR_QUEUE to manage OCR-related jobs.
- Updated worker options to include OCR_QUEUE for enhanced queue listening.
- Refactored PDF workflow to utilize OCR_QUEUE for text extraction jobs, replacing the previous PDF_OCR_QUEUE reference.

* refactor: Rename ImportHistory table to 'imports' for consistency

- Updated the ImportHistory model and associated database migration to reflect the new table name 'imports'.
- Adjusted references in the database model and migration scripts accordingly.

* refactoring

* refactor

* renames

* requirement changes

* use group instead of job ids

* updated DocumentWorkflow class

* rq.Group updated tasks and design

* Refactor workflows to use RQ Groups for job tracking

- Replace job_ids with group_id and status in DocumentWorkflow
- Implement workflow status and job queries using RQ Groups
- Update process_bulk_upload and create_document_workflow for group-based orchestration
- Add workflow status helpers and resumability methods to model
- Update API schemas and Alembic migration for new fields

* Add TextExtractionMetadata schema and update bulk upload processing

- Introduced TextExtractionMetadata class for tracking text extraction results.
- Updated process_bulk_upload to use type hints for better clarity.
- Refactored document upload handling to streamline document creation and job enqueuing.
- Changed job queue from DEFAULT_QUEUE to OCR_QUEUE for text extraction tasks.

* Update job_ids type in DocumentsBulkResponse and refactor create_document_workflow return type

- Changed job_ids in DocumentsBulkResponse from dict[str, Any] to dict[str, str] for better clarity.
- Refactored create_document_workflow to return an RQ Group instead of a dictionary, simplifying the workflow tracking process.

* 2.1 Implement RQ Groups-based job querying

* Add CLI and API support for PDF workflow management

- Implement FastAPI endpoints for workflow start, status, restart, and list
- Add Pydantic schemas for workflow API requests and responses
- Integrate workflow router into API routes
- Add CLI commands for workflow start, status, restart, and list with Rich output
- Extend workflow context for RQ Groups operations and error handling
- Add unit tests for jobs API with RQ Groups integration

* fixes

* tests

* fix tests

---------

Co-authored-by: Priyankesh <priyankeshom@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants