From 0e7b43e1663c8e0e785f74a729079273f480e87b Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 16 Jan 2026 13:32:48 -0800 Subject: [PATCH 1/5] refactor: use LRU cache for session contexts in get_session_context Previously, get_session_context only cached session contexts when options matched hardcoded defaults. This meant setting LANCE_MEM_POOL_SIZE env var would cause cache misses every time, creating a new SessionContext on each call. Now uses an LRU cache (size 4) keyed by resolved configuration values, so repeated calls with the same effective configuration reuse cached contexts regardless of whether values come from defaults, explicit options, or environment variables. Co-Authored-By: Claude Opus 4.5 --- rust/lance-datafusion/src/exec.rs | 78 +++++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 19 deletions(-) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index c61ff26419a..751d83d10f5 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -6,7 +6,7 @@ use std::{ collections::HashMap, fmt::{self, Formatter}, - sync::{Arc, LazyLock, Mutex}, + sync::{Arc, Mutex, OnceLock}, time::Duration, }; @@ -385,28 +385,68 @@ pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext { ctx } -static DEFAULT_SESSION_CONTEXT: LazyLock = - LazyLock::new(|| new_session_context(&LanceExecutionOptions::default())); +/// Cache key for session contexts based on resolved configuration values. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct SessionContextCacheKey { + mem_pool_size: u64, + max_temp_directory_size: u64, + target_partition: Option, + use_spilling: bool, +} -static DEFAULT_SESSION_CONTEXT_WITH_SPILLING: LazyLock = LazyLock::new(|| { - new_session_context(&LanceExecutionOptions { - use_spilling: true, - ..Default::default() - }) -}); +impl SessionContextCacheKey { + fn from_options(options: &LanceExecutionOptions) -> Self { + Self { + mem_pool_size: options.mem_pool_size(), + max_temp_directory_size: options.max_temp_directory_size(), + target_partition: options.target_partition, + use_spilling: options.use_spilling(), + } + } +} + +struct CachedSessionContext { + context: SessionContext, + last_access: std::time::Instant, +} + +fn get_session_cache() -> &'static Mutex> { + static SESSION_CACHE: OnceLock>> = + OnceLock::new(); + SESSION_CACHE.get_or_init(|| Mutex::new(HashMap::new())) +} pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext { - if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE - && options.max_temp_directory_size() == DEFAULT_LANCE_MAX_TEMP_DIRECTORY_SIZE - && options.target_partition.is_none() - { - return if options.use_spilling() { - DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone() - } else { - DEFAULT_SESSION_CONTEXT.clone() - }; + let key = SessionContextCacheKey::from_options(options); + let mut cache = get_session_cache().lock().unwrap(); + + // If key exists, update access time and return + if let Some(entry) = cache.get_mut(&key) { + entry.last_access = std::time::Instant::now(); + return entry.context.clone(); + } + + // Evict least recently used entry if cache is full + const MAX_CACHE_SIZE: usize = 4; + if cache.len() >= MAX_CACHE_SIZE { + if let Some(lru_key) = cache + .iter() + .min_by_key(|(_, v)| v.last_access) + .map(|(k, _)| k.clone()) + { + cache.remove(&lru_key); + } } - new_session_context(options) + + let context = new_session_context(options); + cache.insert( + key, + CachedSessionContext { + context: context.clone(), + last_access: std::time::Instant::now(), + }, + ); + context } fn get_task_context( From 822c5f114ebc408a802184a6c4d612c0dc9bdbdb Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 16 Jan 2026 13:48:29 -0800 Subject: [PATCH 2/5] test: add unit tests for session context LRU cache Tests verify: - Caching works (same options reuse cached context) - Different options create separate cache entries - LRU eviction order (least recently accessed is evicted first) Co-Authored-By: Claude Opus 4.5 --- rust/lance-datafusion/src/exec.rs | 103 ++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index 751d83d10f5..192956096bc 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -929,3 +929,106 @@ impl ExecutionPlan for StrictBatchSizeExec { true } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_session_context_cache() { + let cache = get_session_cache(); + + // Clear any existing entries from other tests + cache.lock().unwrap().clear(); + + // Create first session with default options + let opts1 = LanceExecutionOptions::default(); + let _ctx1 = get_session_context(&opts1); + + { + let cache_guard = cache.lock().unwrap(); + assert_eq!(cache_guard.len(), 1); + } + + // Same options should reuse cached session (no new entry) + let _ctx1_again = get_session_context(&opts1); + { + let cache_guard = cache.lock().unwrap(); + assert_eq!(cache_guard.len(), 1); + } + + // Different options should create new entry + let opts2 = LanceExecutionOptions { + use_spilling: true, + ..Default::default() + }; + let _ctx2 = get_session_context(&opts2); + { + let cache_guard = cache.lock().unwrap(); + assert_eq!(cache_guard.len(), 2); + } + } + + #[test] + fn test_session_context_cache_lru_eviction() { + let cache = get_session_cache(); + + // Clear any existing entries from other tests + cache.lock().unwrap().clear(); + + // Create 4 different configurations to fill the cache + let configs: Vec = (0..4) + .map(|i| LanceExecutionOptions { + mem_pool_size: Some((i + 1) as u64 * 1024 * 1024), + ..Default::default() + }) + .collect(); + + for config in &configs { + let _ctx = get_session_context(config); + } + + { + let cache_guard = cache.lock().unwrap(); + assert_eq!(cache_guard.len(), 4); + } + + // Access config[0] to make it more recently used than config[1] + // (config[0] was inserted first, so without this access it would be evicted) + std::thread::sleep(std::time::Duration::from_millis(1)); + let _ctx = get_session_context(&configs[0]); + + // Add a 5th configuration - should evict config[1] (now least recently used) + let opts5 = LanceExecutionOptions { + mem_pool_size: Some(5 * 1024 * 1024), + ..Default::default() + }; + let _ctx5 = get_session_context(&opts5); + + { + let cache_guard = cache.lock().unwrap(); + assert_eq!(cache_guard.len(), 4); + + // config[0] should still be present (was accessed recently) + let key0 = SessionContextCacheKey::from_options(&configs[0]); + assert!( + cache_guard.contains_key(&key0), + "config[0] should still be cached after recent access" + ); + + // config[1] should be evicted (was least recently used) + let key1 = SessionContextCacheKey::from_options(&configs[1]); + assert!( + !cache_guard.contains_key(&key1), + "config[1] should have been evicted" + ); + + // New config should be present + let key5 = SessionContextCacheKey::from_options(&opts5); + assert!( + cache_guard.contains_key(&key5), + "new config should be cached" + ); + } + } +} From ff9160c57dc4a903aaa92a613a959424df60f6b2 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 16 Jan 2026 14:45:54 -0800 Subject: [PATCH 3/5] fix: handle PoisonError in session context cache Use unwrap_or_else with into_inner() to recover from PoisonError instead of panicking. The cache data remains valid even if a thread panicked while holding the lock. Co-Authored-By: Claude Opus 4.5 --- rust/lance-datafusion/src/exec.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index 192956096bc..ce09d4ca03f 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -418,7 +418,9 @@ fn get_session_cache() -> &'static Mutex SessionContext { let key = SessionContextCacheKey::from_options(options); - let mut cache = get_session_cache().lock().unwrap(); + let mut cache = get_session_cache() + .lock() + .unwrap_or_else(|e| e.into_inner()); // If key exists, update access time and return if let Some(entry) = cache.get_mut(&key) { From 75e7e600ebaffa89f992480f5505d4fc7b590969 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 16 Jan 2026 15:51:25 -0800 Subject: [PATCH 4/5] feat: add LANCE_SESSION_CACHE_SIZE env var for cache size config Co-Authored-By: Claude Opus 4.5 --- rust/lance-datafusion/src/exec.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index ce09d4ca03f..da1dc58048d 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -416,6 +416,17 @@ fn get_session_cache() -> &'static Mutex usize { + const DEFAULT_CACHE_SIZE: usize = 4; + static MAX_CACHE_SIZE: OnceLock = OnceLock::new(); + *MAX_CACHE_SIZE.get_or_init(|| { + std::env::var("LANCE_SESSION_CACHE_SIZE") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(DEFAULT_CACHE_SIZE) + }) +} + pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext { let key = SessionContextCacheKey::from_options(options); let mut cache = get_session_cache() @@ -429,8 +440,7 @@ pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext { } // Evict least recently used entry if cache is full - const MAX_CACHE_SIZE: usize = 4; - if cache.len() >= MAX_CACHE_SIZE { + if cache.len() >= get_max_cache_size() { if let Some(lru_key) = cache .iter() .min_by_key(|(_, v)| v.last_access) From d0bf273e090ec0e8ff2fabc59218bd64e8e592ee Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 16 Jan 2026 16:50:03 -0800 Subject: [PATCH 5/5] fix: serialize session cache tests to avoid race conditions Tests share global cache state and were failing when run in parallel. Add a test-only Mutex to ensure they run sequentially. Co-Authored-By: Claude Opus 4.5 --- rust/lance-datafusion/src/exec.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index da1dc58048d..9eed7f92bfc 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -946,8 +946,12 @@ impl ExecutionPlan for StrictBatchSizeExec { mod tests { use super::*; + // Serialize cache tests since they share global state + static CACHE_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()); + #[test] fn test_session_context_cache() { + let _lock = CACHE_TEST_LOCK.lock().unwrap(); let cache = get_session_cache(); // Clear any existing entries from other tests @@ -983,6 +987,7 @@ mod tests { #[test] fn test_session_context_cache_lru_eviction() { + let _lock = CACHE_TEST_LOCK.lock().unwrap(); let cache = get_session_cache(); // Clear any existing entries from other tests