Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion src/kubernetes/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::pin::Pin;
use std::sync::Arc;

use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, info, trace, warn};

use super::ApiFilters;
Expand Down Expand Up @@ -146,6 +146,10 @@ pub struct K8sClientPool {
kubeconfig: Kubeconfig,
clients: Arc<RwLock<HashMap<String, Client>>>,
registries: Arc<RwLock<HashMap<String, CachedRegistry>>>,
/// Per-context discovery locks prevent concurrent refreshes for the same context
/// from racing to overwrite the cached registry while still allowing different
/// contexts to discover in parallel.
discovery_locks: Arc<RwLock<HashMap<String, Arc<Mutex<()>>>>>,
/// Current active contexts (supports multi-USE)
current_contexts: Arc<RwLock<Vec<String>>>,
/// Progress reporter for query status updates
Expand Down Expand Up @@ -185,6 +189,7 @@ impl K8sClientPool {
kubeconfig: Kubeconfig::default(),
clients: Arc::new(RwLock::new(HashMap::new())),
registries: Arc::new(RwLock::new(HashMap::new())),
discovery_locks: Arc::new(RwLock::new(HashMap::new())),
current_contexts: Arc::new(RwLock::new(vec!["test-context".to_string()])),
progress,
resource_cache: ResourceCache::new().expect("Failed to create test cache"),
Expand Down Expand Up @@ -213,12 +218,30 @@ impl K8sClientPool {
kubeconfig,
clients: Arc::new(RwLock::new(HashMap::new())),
registries: Arc::new(RwLock::new(HashMap::new())),
discovery_locks: Arc::new(RwLock::new(HashMap::new())),
current_contexts: Arc::new(RwLock::new(vec![context_name])),
progress: crate::progress::create_progress_handle(),
resource_cache: ResourceCache::new()?,
})
}

/// Return the mutex used to serialize discovery for a single context.
async fn discovery_lock_for_context(&self, context: &str) -> Arc<Mutex<()>> {
{
let locks = self.discovery_locks.read().await;
if let Some(lock) = locks.get(context) {
return Arc::clone(lock);
}
}

let mut locks = self.discovery_locks.write().await;
Arc::clone(
locks
.entry(context.to_string())
.or_insert_with(|| Arc::new(Mutex::new(()))),
)
}

/// Initialize the pool by connecting to the default context and discovering resources
/// Subscribe to progress() before calling this to receive status updates
pub async fn initialize(&self) -> Result<()> {
Expand Down Expand Up @@ -258,6 +281,8 @@ impl K8sClientPool {
/// c. Otherwise, run parallel discovery and cache by fingerprint
async fn discover_resources_for_context(&self, context: &str, force: bool) -> Result<()> {
let start = std::time::Instant::now();
let discovery_lock = self.discovery_lock_for_context(context).await;
let _discovery_guard = discovery_lock.lock().await;

// Check if already in memory and not expired
if !force {
Expand Down Expand Up @@ -1046,6 +1071,9 @@ impl K8sClientPool {

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;

/// Helper function to test alias building logic in isolation
/// This replicates the logic from process_discovered_crds
fn build_aliases(
Expand All @@ -1065,6 +1093,35 @@ mod tests {
aliases
}

#[tokio::test]
async fn discovery_lock_is_per_context() {
let pool = super::K8sClientPool::new_for_test(crate::progress::create_progress_handle());
let first = pool.discovery_lock_for_context("prod").await;
let second = pool.discovery_lock_for_context("prod").await;
let other = pool.discovery_lock_for_context("staging").await;

assert!(Arc::ptr_eq(&first, &second));
assert!(!Arc::ptr_eq(&first, &other));

let guard = first.lock().await;
let same_context_waiter = tokio::spawn(async move {
let _guard = second.lock().await;
});

assert!(
tokio::time::timeout(Duration::from_millis(20), same_context_waiter)
.await
.is_err(),
"same-context discovery lock should block concurrent discovery"
);

let _other_guard = tokio::time::timeout(Duration::from_millis(20), other.lock())
.await
.expect("different-context discovery lock should remain available");

drop(guard);
}

#[test]
fn test_singular_alias_different_from_plural() {
// CRD with plural "certificates" and singular "certificate"
Expand Down