Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions cortex-mem-core/src/search/vector_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::sync::mpsc;
use tracing::{debug, info, warn};

/// Search options
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone)]
pub struct SearchOptions {
/// Maximum number of results
pub limit: usize,
Expand All @@ -27,6 +27,10 @@ pub struct SearchOptions {
pub root_uri: Option<String>,
/// Enable recursive search
pub recursive: bool,
/// Precomputed intent from LLM analysis.
/// If provided, semantic_search skips intent analysis and reuses this intent.
/// This reduces LLM calls from 5 per search → 1 per search.
pub precomputed_intent: Option<Arc<EnhancedQueryIntent>>,
}

impl Default for SearchOptions {
Expand All @@ -36,6 +40,7 @@ impl Default for SearchOptions {
threshold: 0.6,
root_uri: None,
recursive: true,
precomputed_intent: None,
}
}
}
Expand Down Expand Up @@ -262,13 +267,30 @@ impl VectorSearchEngine {
Some((scope, owner_id, memory_id))
}

/// Parse root_uri to extract (scope, owner_id) for filtering.
/// e.g. "cortex://session/wecom-alis" -> Some(("session", "wecom-alis"))
fn parse_root_uri(root_uri: &str) -> Option<(String, String)> {
let stripped = root_uri.strip_prefix("cortex://")?;
let parts: Vec<&str> = stripped.splitn(3, '/').collect();
if parts.len() < 2 {
return None;
}
Some((parts[0].to_string(), parts[1].to_string()))
}

/// Semantic search using vector similarity
pub async fn semantic_search(
&self,
query: &str,
options: &SearchOptions,
) -> Result<Vec<SearchResult>> {
let intent = self.analyze_intent(query).await?;
// Reuse precomputed intent if available (reduces LLM calls from 5 → 1 per search)
let intent = if let Some(ref precomputed) = options.precomputed_intent {
info!("semantic_search: reusing precomputed intent (type={:?})", precomputed.intent_type);
(**precomputed).clone()
} else {
self.analyze_intent(query).await?
};
let query_text = if intent.rewritten_query.trim().is_empty() {
query
} else {
Expand All @@ -279,7 +301,13 @@ impl VectorSearchEngine {

let mut filters = crate::types::Filters::default();
if let Some(scope) = &options.root_uri {
filters.uri_prefix = Some(scope.clone());
// Set owner_scope + uri_prefix so qdrant-level filtering uses exact scope
if let Some((owner_scope, _owner_id)) = Self::parse_root_uri(scope) {
filters.owner_scope = Some(owner_scope);
filters.uri_prefix = Some(scope.clone());
} else {
filters.uri_prefix = Some(scope.clone());
}
}

let scored = self
Expand Down Expand Up @@ -355,8 +383,13 @@ impl VectorSearchEngine {
query: &str,
options: &SearchOptions,
) -> Result<Vec<SearchResult>> {
// 1. LLM 统一意图分析(单次请求)
let intent = self.analyze_intent(query).await?;
// Reuse precomputed intent if available (reduces LLM calls from 5 → 1 per search)
let intent = if let Some(ref precomputed) = options.precomputed_intent {
info!("layered_semantic_search: reusing precomputed intent (type={:?})", precomputed.intent_type);
(**precomputed).clone()
} else {
self.analyze_intent(query).await?
};

info!(
"Intent analysis: type={:?}, entities={:?}, keywords={:?}, rewritten='{}'",
Expand All @@ -376,6 +409,9 @@ impl VectorSearchEngine {
);
let mut l0_filters = crate::types::Filters::with_layer("L0");
if let Some(scope) = &options.root_uri {
if let Some((owner_scope, _owner_id)) = Self::parse_root_uri(scope) {
l0_filters.owner_scope = Some(owner_scope);
}
l0_filters.uri_prefix = Some(scope.clone());
}

Expand Down Expand Up @@ -560,7 +596,7 @@ impl VectorSearchEngine {
}

/// 统一意图分析(优先使用 LLM 单次调用,LLM 不可用时使用最小 fallback)
async fn analyze_intent(&self, query: &str) -> Result<EnhancedQueryIntent> {
pub async fn analyze_intent(&self, query: &str) -> Result<EnhancedQueryIntent> {
if self.enable_intent_analysis {
if let Some(llm) = &self.llm_client {
match self.analyze_intent_with_llm(llm.as_ref(), query).await {
Expand Down
2 changes: 2 additions & 0 deletions cortex-mem-core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ pub struct Filters {
pub max_importance: Option<f32>,
/// URI prefix filter for scope-based searching
pub uri_prefix: Option<String>,
/// Owner scope hint: "session", "agent", or "user" (used with uri_prefix to construct qdrant filter)
pub owner_scope: Option<String>,
pub custom: HashMap<String, serde_json::Value>,
}

Expand Down
15 changes: 15 additions & 0 deletions cortex-mem-core/src/vector_store/qdrant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,21 @@ impl QdrantVectorStore {
fn filters_to_qdrant_filter(&self, filters: &Filters) -> Option<Filter> {
let mut conditions = Vec::new();

// Filter by scope + uri_prefix as a Match on the uri field
// This enables session/agent/user scope filtering at the qdrant level
if filters.owner_scope.is_some() && filters.uri_prefix.is_some() {
let uri_prefix = filters.uri_prefix.as_ref().unwrap();
conditions.push(Condition {
condition_one_of: Some(condition::ConditionOneOf::Field(FieldCondition {
key: "uri".to_string(),
r#match: Some(Match {
match_value: Some(r#match::MatchValue::Text(uri_prefix.clone())),
}),
..Default::default()
})),
});
}

if let Some(user_id) = &filters.user_id {
conditions.push(Condition {
condition_one_of: Some(condition::ConditionOneOf::Field(FieldCondition {
Expand Down
12 changes: 12 additions & 0 deletions cortex-mem-service/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM ubuntu:24.04

RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*

COPY target/release/cortex-mem-service /cortex-mem-service

ENV CORTEX_DATA_DIR=/mnt/sata-trace/cortex-mem/data
ENV CORTEX_TENANT_ID=tenant_claw

ENTRYPOINT ["/cortex-mem-service"]
1 change: 1 addition & 0 deletions cortex-mem-service/src/handlers/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ pub async fn explore(
threshold: 0.3, // Lower threshold for exploration
root_uri: Some(req.start_uri.clone()),
recursive: true,
precomputed_intent: None,
};

let search_results = vector_engine
Expand Down
9 changes: 9 additions & 0 deletions cortex-mem-service/src/handlers/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async fn search_layered(
threshold: min_score,
root_uri: None,
recursive: true,
precomputed_intent: None,
};
let mut semantic_options = options.clone();
semantic_options.threshold = (min_score * 0.5).max(0.0);
Expand All @@ -85,6 +86,14 @@ async fn search_layered(

let profile = build_query_profile(query);

// [优化] 一次性 intent 分析,后续 semantic_search 复用(5次LLM→1次)
let precomputed_intent = vector_engine.analyze_intent(query).await?;
let precomputed_intent = Arc::new(precomputed_intent);
options.precomputed_intent = Some(precomputed_intent.clone());
semantic_options.precomputed_intent = Some(precomputed_intent.clone());
tracing::info!("[search优化] intent precomputed: type={:?}, keywords={:?}",
precomputed_intent.intent_type, precomputed_intent.keywords);

let layered_results = vector_engine
.layered_semantic_search(query, &options)
.await
Expand Down
5 changes: 3 additions & 2 deletions cortex-mem-service/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use axum::{Router, routing::get};
use clap::Parser;
use std::fs::File;
use std::net::SocketAddr;
use std::net::{SocketAddr, IpAddr};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tower_http::cors::CorsLayer;
Expand Down Expand Up @@ -139,7 +139,8 @@ async fn main() -> anyhow::Result<()> {
.with_state(state);

// Start server
let addr = SocketAddr::from(([127, 0, 0, 1], cli.port));
let ip: IpAddr = cli.host.parse().unwrap_or(IpAddr::from([0, 0, 0, 0]));
let addr = SocketAddr::from((ip, cli.port));
info!("Server listening on http://{}", addr);

let listener = tokio::net::TcpListener::bind(addr).await?;
Expand Down