Add fast raw memory search path#189
Conversation
There was a problem hiding this comment.
Code Review
This pull request enhances the memory search functionality by adding support for 'snippet' and 'code' domains, implementing optional synthesized answer generation, and introducing latency tracking with percentile statistics. It also adds caching mechanisms for retrieval plans and profile catalogs to improve efficiency. Review feedback highlights a thread-safety concern when iterating over the latency samples dictionary, suggests parallelizing I/O-bound search tasks using asyncio.gather to reduce total latency, and recommends adding an eviction policy to the profile catalog cache to prevent potential memory leaks.
|
|
||
| def _latency_stats() -> Dict[str, Dict[str, float]]: | ||
| stats: Dict[str, Dict[str, float]] = {} | ||
| for mode, samples in _latency_samples.items(): |
There was a problem hiding this comment.
Iterating over _latency_samples.items() directly is not thread-safe because _latency_samples is a defaultdict. If a concurrent request records latency for a new mode (causing a new key to be added) while this loop is running, it will raise a RuntimeError: dictionary changed size during iteration.
Consider wrapping the items in a list() to create a snapshot of the keys before iteration.
| for mode, samples in _latency_samples.items(): | |
| for mode, samples in list(_latency_samples.items()): |
| if "profile" in plan: | ||
| results, elapsed = await _timed("profile", _search_profile, pipeline, user_id) | ||
| latency_ms["profile"] = elapsed | ||
| all_results.extend(results) | ||
| if "temporal" in plan: | ||
| results, elapsed = await _timed("temporal", _search_temporal, pipeline, req.query, user_id, req.top_k) | ||
| latency_ms["temporal"] = elapsed | ||
| all_results.extend(results) | ||
| if "summary" in plan: | ||
| results, elapsed = await _timed("summary", _search_summary, pipeline, req.query, user_id, req.top_k) | ||
| latency_ms["summary"] = elapsed | ||
| all_results.extend(results) | ||
| if "snippet" in plan: | ||
| results, elapsed = await _timed("snippet", _search_snippet, pipeline, req.query, user_id, req.top_k) | ||
| latency_ms["snippet"] = elapsed | ||
| all_results.extend(results) | ||
| if "code" in plan: | ||
| results, elapsed = await _timed("code", _search_code, pipeline, req.query, user_id, req.top_k) | ||
| latency_ms["code"] = elapsed | ||
| all_results.extend(results) |
There was a problem hiding this comment.
The search domains are currently queried sequentially, which negates the performance benefits of having a "fast" raw search path. Since these operations are I/O bound, they should be executed in parallel using asyncio.gather to minimize total latency.
| if "profile" in plan: | |
| results, elapsed = await _timed("profile", _search_profile, pipeline, user_id) | |
| latency_ms["profile"] = elapsed | |
| all_results.extend(results) | |
| if "temporal" in plan: | |
| results, elapsed = await _timed("temporal", _search_temporal, pipeline, req.query, user_id, req.top_k) | |
| latency_ms["temporal"] = elapsed | |
| all_results.extend(results) | |
| if "summary" in plan: | |
| results, elapsed = await _timed("summary", _search_summary, pipeline, req.query, user_id, req.top_k) | |
| latency_ms["summary"] = elapsed | |
| all_results.extend(results) | |
| if "snippet" in plan: | |
| results, elapsed = await _timed("snippet", _search_snippet, pipeline, req.query, user_id, req.top_k) | |
| latency_ms["snippet"] = elapsed | |
| all_results.extend(results) | |
| if "code" in plan: | |
| results, elapsed = await _timed("code", _search_code, pipeline, req.query, user_id, req.top_k) | |
| latency_ms["code"] = elapsed | |
| all_results.extend(results) | |
| search_tasks = [] | |
| if "profile" in plan: | |
| search_tasks.append(_timed("profile", _search_profile, pipeline, user_id)) | |
| if "temporal" in plan: | |
| search_tasks.append(_timed("temporal", _search_temporal, pipeline, req.query, user_id, req.top_k)) | |
| if "summary" in plan: | |
| search_tasks.append(_timed("summary", _search_summary, pipeline, req.query, user_id, req.top_k)) | |
| if "snippet" in plan: | |
| search_tasks.append(_timed("snippet", _search_snippet, pipeline, req.query, user_id, req.top_k)) | |
| if "code" in plan: | |
| search_tasks.append(_timed("code", _search_code, pipeline, req.query, user_id, req.top_k)) | |
| if search_tasks: | |
| task_results = await asyncio.gather(*search_tasks) | |
| for (results, elapsed), mode in zip(task_results, plan): | |
| latency_ms[mode] = elapsed | |
| all_results.extend(results) |
|
|
||
| self.embed_fn = embed_fn | ||
| self._snippet_stores: Dict[str, BaseVectorStore] = {} | ||
| self._profile_catalog_cache: Dict[str, tuple[float, List[Dict[str, str]], list]] = {} |
There was a problem hiding this comment.
The _profile_catalog_cache uses user_id as a key but lacks an eviction policy. In a production environment with a large number of unique users, this dictionary will grow indefinitely, leading to a memory leak.
Consider using an LRU cache or implementing a simple size-based eviction mechanism to bound memory usage.
Closes #163
Summary
/v1/memory/searchinto a low-latency raw search path across profile, temporal, summary, snippet, and code domainsanswer=trueTests
pytest tests/api/test_dependencies_and_routes.py tests/unit/test_schemas.py -qpython3 -m ruff check src/api/schemas.py src/api/routes/memory.py src/pipelines/retrieval.py tests/api/test_dependencies_and_routes.py