An AI-powered medical question answering service with async task processing, multi-LLM support, and real-time statistics.
- Async Architecture: Built with FastAPI and asyncio for high-concurrency I/O-bound workloads
- Multi-LLM Support: Works with Claude (Anthropic), DeepSeek, OpenAI, and Mock provider
- Concurrent Task Processing: Semaphore-based concurrency limiting (max = CPU cores)
- Retry Logic: Exponential backoff for failed LLM calls
- Real-time Statistics: Live metrics dashboard with task and performance monitoring
- PostgreSQL Storage: Persistent message and log storage
- Docker Ready: Complete containerization with docker-compose
-
FastAPI Backend
- REST API for message submission and retrieval
- Statistics endpoint for real-time metrics
- Static file serving for frontend
-
Async Task Processing System
- FastAPI BackgroundTasks spawn async tasks on-demand
- Semaphore limits concurrent tasks to CPU cores (configurable)
- Tasks process messages asynchronously via asyncio
- Graceful shutdown handling
-
LLM Provider Abstraction
- Unified interface for multiple LLM providers
- Support for Anthropic Claude, DeepSeek, OpenAI
- Mock provider for testing without API keys
-
PostgreSQL Database
- Message storage with status tracking (pending → processing → completed/failed)
- Application logging to stdout for operational debugging
- Efficient indexing for performance
-
Frontend
- Simple HTML + Vanilla JS interface
- Polling-based message status updates
- Live statistics dashboard
- Responsive design
This system uses asyncio tasks with semaphore-based concurrency limiting instead of a traditional worker pool:
Why This Approach:
- I/O-Bound Workload: 95%+ of time spent waiting for LLM API responses, not CPU computation
- Lightweight: Async tasks use KB vs MB per process, enabling higher concurrency
- Fast Scaling: Can handle hundreds of concurrent requests efficiently
- Simpler Implementation: No inter-process communication complexity, easier debugging
- Natural Fit: FastAPI and LLM SDKs (Anthropic, OpenAI) are already async-native
How It Works:
- User submits question via POST /chat
- Message saved to database with status='pending'
- FastAPI BackgroundTasks spawns an async task via
asyncio.create_task() - Task acquires semaphore (blocks if at max_workers limit)
- Task calls LLM with retry logic, updates database
- Task releases semaphore on completion
Concurrency Limiting:
- Semaphore set to
MAX_WORKERS(defaults to CPU core count per spec) - Tasks block on semaphore acquisition when limit reached
- No explicit queue - tasks wait on semaphore internally
Trade-offs:
- ✅ Better performance for I/O-bound tasks than multiprocessing
- ✅ Lower memory footprint (~10-100KB per task vs ~10MB per process)
- ✅ Simpler codebase, easier to maintain
- ❌ No individual "worker" objects to track lifecycle
- ❌ No concept of "idle workers" (tasks either exist/active or don't exist)
- ❌ No visible queue depth (blocking happens at semaphore level)
# 1. Clone/navigate to project directory
cd one_doc_ex
# 2. Create .env file from example
cp .env.example .env
# 3. (Optional) Add your API keys to .env
nano .env # Or your favorite editor
# 4. Start services
docker-compose up --build
# 5. Open browser to http://localhost:8000# 1. Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# 2. Install dependencies
pip install -r requirements.txt
# 3. Set up PostgreSQL
# Install PostgreSQL 15+ and create database:
createdb medchat
createuser medchat_user -P # Set password: medchat_password
# 4. Create .env file
cp .env.example .env
# Edit .env with your database URL and API keys
# 5. Initialize database
python scripts/init_db.py
# 6. Run application
uvicorn app.main:app --reload --port 8000
# 7. Open browser to http://localhost:8000All configuration via environment variables (.env file):
SERVER_PORT: HTTP server port (default: 8000)HOST: Bind host (default: 0.0.0.0)
DATABASE_URL: PostgreSQL connection string
LLM_PROVIDER: Provider to use (anthropic,deepseek,openai,mock)LLM_MODEL: Model name (e.g.,claude-3-5-sonnet-20241022)LLM_TEMPERATURE: Temperature setting (default: 0.7)LLM_MAX_TOKENS: Max tokens per request (default: 2000)
ANTHROPIC_API_KEY: Anthropic API key (for Claude)DEEPSEEK_API_KEY: DeepSeek API keyOPENAI_API_KEY: OpenAI API key
MAX_WORKERS: Max concurrent workers (default: CPU count)WORKER_IDLE_TIMEOUT: Seconds before idle worker exits (default: 60)
RETRY_DELAY: Base delay between retries in seconds (default: 2)MAX_RETRIES: Maximum retry attempts (default: 3)
QUEUE_MAXSIZE: Maximum queue size (default: 1000)
Submit a new medical question.
Request:
{
"question": "What are the symptoms of iron deficiency?"
}Response:
{
"messageId": "uuid-string"
}Get status and response for a message.
Responses:
Pending (waiting for worker):
{
"status": "pending"
}Processing (actively being processed):
{
"status": "processing"
}Completed:
{
"status": "completed",
"response": "Iron deficiency commonly causes..."
}Failed:
{
"status": "failed",
"error": "LLM request failed after retries"
}Get real-time system statistics.
Response:
{
"messagesProcessed": 120,
"messagesSucceeded": 110,
"messagesFailed": 10,
"totalRetries": 27,
"averageProcessingTimeMs": 850,
"averageTokensPerMessage": 430,
"totalTokensUsed": 51600,
"activeWorkers": 4
}Note on Statistics:
activeWorkers: Shows current number of active async tasks processing messages
Health check endpoint.
# Install dev dependencies
pip install -r requirements.txt
# Run all tests
pytest
# Run with coverage
pytest --cov=app --cov-report=html
# Run specific test file
pytest tests/test_llm_providers/test_mock_provider.py
# Run with output
pytest -v -stests/
├── conftest.py # Shared fixtures
├── test_api/ # API endpoint tests
├── test_services/ # Service layer tests
├── test_workers/ # Worker system tests
└── test_llm_providers/ # LLM provider tests
one_doc_ex/
├── app/
│ ├── api/ # API endpoints
│ ├── llm_providers/ # LLM provider implementations
│ ├── models.py # Database models
│ ├── prompts/ # System prompts
│ ├── services/ # Business logic services
│ ├── workers/ # Async worker system
│ ├── config.py # Configuration
│ ├── database.py # Database connection
│ ├── main.py # FastAPI application
│ └── schemas.py # Pydantic schemas
├── static/ # Frontend files
├── tests/ # Test suite
├── scripts/ # Utility scripts
└── docker-compose.yml # Docker orchestration
- Create new provider class in
app/llm_providers/:
from app.llm_providers.base import BaseLLMProvider, LLMResponse
class MyProvider(BaseLLMProvider):
async def generate(self, prompt: str, system_prompt: str) -> LLMResponse:
# Implement API call
pass
def validate_config(self) -> bool:
# Validate configuration
pass- Register in
app/services/llm_service.py:
provider_map = {
"my_provider": MyProvider,
# ... existing providers
}-
Add configuration in
app/config.pyif needed -
Update
.env.examplewith new provider option
# Docker logs
docker-compose logs -f app
# Message status
docker-compose exec postgres psql -U medchat_user -d medchat \
-c "SELECT message_id, status, created_at FROM messages ORDER BY created_at DESC LIMIT 10;"Access the built-in statistics dashboard at http://localhost:8000
Metrics updated every 5 seconds:
- Messages processed, succeeded, failed
- Success rate
- Average response time
- Queue length
- Active/idle workers
- Token usage
# Check PostgreSQL is running
docker-compose ps postgres
# Check connection
docker-compose exec postgres psql -U medchat_user -d medchat -c "SELECT 1;"
# Reset database
docker-compose down -v
docker-compose up -d postgres
python scripts/init_db.py# Test with mock provider (no API key needed)
LLM_PROVIDER=mock docker-compose up
# Verify API key is set
docker-compose exec app env | grep API_KEY
# Check provider logs
docker-compose logs app | grep -i "llm\|provider\|anthropic\|deepseek"# Check worker pool status via API
curl http://localhost:8000/api/statistics
# Check queue length
docker-compose exec postgres psql -U medchat_user -d medchat \
-c "SELECT status, COUNT(*) FROM messages GROUP BY status;"
# Restart workers
docker-compose restart app# Increase for higher concurrency (I/O-bound workload)
MAX_WORKERS=16 docker-compose up
# Note: Can exceed CPU count for I/O-bound tasks# Faster retries
RETRY_DELAY=1 MAX_RETRIES=5 docker-compose up
# Slower, more patient retries
RETRY_DELAY=5 MAX_RETRIES=3 docker-compose up-- Add indexes if queries are slow
CREATE INDEX IF NOT EXISTS idx_messages_status_created
ON messages(status, created_at DESC);
-- Analyze query performance
EXPLAIN ANALYZE SELECT * FROM messages
WHERE status = 'processing'
ORDER BY created_at DESC;- API Keys: Never commit
.envfile with real API keys - Database: Change default credentials in production
- HTTPS: Use reverse proxy (nginx) with SSL in production
- Rate Limiting: Add rate limiting for public deployments
- Authentication: Add auth middleware for production use
This project is for educational/assignment purposes.
- Fork the repository
- Create feature branch (
git checkout -b feature/amazing-feature) - Commit changes (
git commit -m 'Add amazing feature') - Push to branch (
git push origin feature/amazing-feature) - Open Pull Request
- FastAPI for the excellent async web framework
- Anthropic, DeepSeek, and OpenAI for LLM APIs
- PostgreSQL for robust data storage