Modular ML inference pipeline for real-time recommendation systems.
Companion open-source implementation for the paper:
Scalable ML Inference for Real-Time Recommendations International Journal of Information Technology and Computer Engineering (IJITCE) Volume 13, Issue 4, 2025 | ISSN 2347-3657 DOI: 10.62647/IJITCE2025V13I4PP1-8
Building a recommendation system that scores items in real time requires assembling several components: candidate retrieval, feature assembly, model scoring, and re-ranking. This library provides each stage as an independent, pluggable component and an orchestrator that chains them with per-stage latency tracking and an optional latency budget.
The architecture is based on production systems serving millions of users at sub-100ms latency, including the Intuit SER system (97% latency reduction from 1.2s to 40ms) and Expedia's ranking pipeline.
| Module | Purpose |
|---|---|
scaleinfer.retrieve |
Candidate generation via cosine similarity or precomputed lists |
scaleinfer.features |
Thread-safe feature store with static hydration and dynamic computation |
scaleinfer.score |
Model scoring with LRU caching and pluggable score functions |
scaleinfer.rank |
Score-based ranking with optional MMR diversity |
scaleinfer.pipeline |
End-to-end orchestrator with per-stage latency tracking |
scaleinfer.optimize |
Pipeline profiler with p50/p95/p99 breakdown per stage |
pip install scaleinferOr with UV:
uv add scaleinferimport numpy as np
from scaleinfer.retrieve.backends import InMemoryRetriever
from scaleinfer.features.assembler import FeatureStore, FeatureAssembler
from scaleinfer.score.scorer import ModelScorer, LRUCache
from scaleinfer.rank.ranker import Ranker
from scaleinfer.pipeline.pipeline import RecommendationPipeline
# Build the retrieval index
retriever = InMemoryRetriever()
retriever.add_items(item_ids, item_embeddings)
# Set up feature store
store = FeatureStore()
store.bulk_set({"item_001": {"popularity": 0.9, "recency": 0.7}})
assembler = FeatureAssembler(feature_store=store)
# Configure scoring with caching
scorer = ModelScorer(score_fn=my_model.predict, cache=LRUCache(maxsize=50000))
# Configure ranking
ranker = Ranker(top_k=20, diversity_weight=0.2)
# Assemble and run the pipeline
pipeline = RecommendationPipeline(
retriever=retriever,
feature_assembler=assembler,
scorer=scorer,
ranker=ranker,
latency_budget_ms=100,
)
result = pipeline.recommend(query_vector)
print(result.summary())from scaleinfer.optimize.profiler import PipelineProfiler
profiler = PipelineProfiler(pipeline)
report = profiler.run(query_vectors=test_queries)
print(report.summary())
print("Bottleneck:", report.bottleneck_stage())uv sync --all-extras
uv run pytest tests/ -v --cov=src
uv run isort src/ tests/ && uv run black src/ tests/If you use this library in your research, please cite the paper:
Scalable ML Inference for Real-Time Recommendations.
International Journal of Information Technology and Computer Engineering (IJITCE),
Volume 13, Issue 4, 2025. DOI: 10.62647/IJITCE2025V13I4PP1-8
Apache 2.0