From 1cd18eeaefb5cad7f386a3870cc9c954b1cd050f Mon Sep 17 00:00:00 2001 From: wuyangfan <1102042793@qq.com> Date: Sun, 17 May 2026 14:59:41 +0800 Subject: [PATCH] fix: serialize resource discovery per context --- src/kubernetes/client.rs | 59 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/src/kubernetes/client.rs b/src/kubernetes/client.rs index 15debb4..e9714f2 100644 --- a/src/kubernetes/client.rs +++ b/src/kubernetes/client.rs @@ -9,7 +9,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use tracing::{debug, info, trace, warn}; use super::ApiFilters; @@ -146,6 +146,10 @@ pub struct K8sClientPool { kubeconfig: Kubeconfig, clients: Arc>>, registries: Arc>>, + /// Per-context discovery locks prevent concurrent refreshes for the same context + /// from racing to overwrite the cached registry while still allowing different + /// contexts to discover in parallel. + discovery_locks: Arc>>>>, /// Current active contexts (supports multi-USE) current_contexts: Arc>>, /// Progress reporter for query status updates @@ -185,6 +189,7 @@ impl K8sClientPool { kubeconfig: Kubeconfig::default(), clients: Arc::new(RwLock::new(HashMap::new())), registries: Arc::new(RwLock::new(HashMap::new())), + discovery_locks: Arc::new(RwLock::new(HashMap::new())), current_contexts: Arc::new(RwLock::new(vec!["test-context".to_string()])), progress, resource_cache: ResourceCache::new().expect("Failed to create test cache"), @@ -213,12 +218,30 @@ impl K8sClientPool { kubeconfig, clients: Arc::new(RwLock::new(HashMap::new())), registries: Arc::new(RwLock::new(HashMap::new())), + discovery_locks: Arc::new(RwLock::new(HashMap::new())), current_contexts: Arc::new(RwLock::new(vec![context_name])), progress: crate::progress::create_progress_handle(), resource_cache: ResourceCache::new()?, }) } + /// Return the mutex used to serialize discovery for a single context. + async fn discovery_lock_for_context(&self, context: &str) -> Arc> { + { + let locks = self.discovery_locks.read().await; + if let Some(lock) = locks.get(context) { + return Arc::clone(lock); + } + } + + let mut locks = self.discovery_locks.write().await; + Arc::clone( + locks + .entry(context.to_string()) + .or_insert_with(|| Arc::new(Mutex::new(()))), + ) + } + /// Initialize the pool by connecting to the default context and discovering resources /// Subscribe to progress() before calling this to receive status updates pub async fn initialize(&self) -> Result<()> { @@ -258,6 +281,8 @@ impl K8sClientPool { /// c. Otherwise, run parallel discovery and cache by fingerprint async fn discover_resources_for_context(&self, context: &str, force: bool) -> Result<()> { let start = std::time::Instant::now(); + let discovery_lock = self.discovery_lock_for_context(context).await; + let _discovery_guard = discovery_lock.lock().await; // Check if already in memory and not expired if !force { @@ -1046,6 +1071,9 @@ impl K8sClientPool { #[cfg(test)] mod tests { + use std::sync::Arc; + use std::time::Duration; + /// Helper function to test alias building logic in isolation /// This replicates the logic from process_discovered_crds fn build_aliases( @@ -1065,6 +1093,35 @@ mod tests { aliases } + #[tokio::test] + async fn discovery_lock_is_per_context() { + let pool = super::K8sClientPool::new_for_test(crate::progress::create_progress_handle()); + let first = pool.discovery_lock_for_context("prod").await; + let second = pool.discovery_lock_for_context("prod").await; + let other = pool.discovery_lock_for_context("staging").await; + + assert!(Arc::ptr_eq(&first, &second)); + assert!(!Arc::ptr_eq(&first, &other)); + + let guard = first.lock().await; + let same_context_waiter = tokio::spawn(async move { + let _guard = second.lock().await; + }); + + assert!( + tokio::time::timeout(Duration::from_millis(20), same_context_waiter) + .await + .is_err(), + "same-context discovery lock should block concurrent discovery" + ); + + let _other_guard = tokio::time::timeout(Duration::from_millis(20), other.lock()) + .await + .expect("different-context discovery lock should remain available"); + + drop(guard); + } + #[test] fn test_singular_alias_different_from_plural() { // CRD with plural "certificates" and singular "certificate"