From db3c7c15f409eb77acbdb1fe0a7488f6b92d6549 Mon Sep 17 00:00:00 2001 From: Yaroslav Borbat Date: Tue, 10 Mar 2026 12:51:13 +0300 Subject: [PATCH] fix(dra): handle parent watcher channel close in wrapWatcher Signed-off-by: Yaroslav Borbat --- .../plugin/wrapresourceslice/watcher.go | 18 +++++++++++++++--- .../virtualization-dra/internal/usb/store.go | 9 +++++---- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/images/virtualization-dra/internal/plugin/wrapresourceslice/watcher.go b/images/virtualization-dra/internal/plugin/wrapresourceslice/watcher.go index 5c1fadf7b4..4621af7954 100644 --- a/images/virtualization-dra/internal/plugin/wrapresourceslice/watcher.go +++ b/images/virtualization-dra/internal/plugin/wrapresourceslice/watcher.go @@ -22,7 +22,7 @@ import ( "k8s.io/apimachinery/pkg/watch" ) -func newWrapWatcher(ctx context.Context, w watch.Interface, match func(event watch.Event) bool) watch.Interface { +func newWrapWatcher(ctx context.Context, w watch.Interface, match func(event watch.Event) bool) *wrapWatcher { ctx, cancel := context.WithCancel(ctx) watcher := &wrapWatcher{ @@ -31,6 +31,7 @@ func newWrapWatcher(ctx context.Context, w watch.Interface, match func(event wat ctx: ctx, cancel: cancel, result: make(chan watch.Event), + done: make(chan struct{}), } go watcher.receive(ctx) @@ -44,17 +45,27 @@ type wrapWatcher struct { ctx context.Context cancel context.CancelFunc result chan watch.Event + done chan struct{} } func (w *wrapWatcher) receive(ctx context.Context) { + defer close(w.result) + defer close(w.done) resultChan := w.watcher.ResultChan() for { select { case <-ctx.Done(): return - case event := <-resultChan: + case event, ok := <-resultChan: + if !ok { + return + } if w.match == nil || w.match(event) { - w.result <- event + select { + case <-ctx.Done(): + return + case w.result <- event: + } } } } @@ -71,4 +82,5 @@ func (w *wrapWatcher) Stop() { w.watcher.Stop() w.cancel() } + <-w.done } diff --git a/images/virtualization-dra/internal/usb/store.go b/images/virtualization-dra/internal/usb/store.go index a163b8fa8b..56875d44dc 100644 --- a/images/virtualization-dra/internal/usb/store.go +++ b/images/virtualization-dra/internal/usb/store.go @@ -480,10 +480,6 @@ func (s *AllocationStore) Unprepare(_ context.Context, claimUID types.UID) error s.mu.Lock() defer s.mu.Unlock() - if err := s.cdi.DeleteClaimSpecFile(string(claimUID)); err != nil { - return fmt.Errorf("unable to delete CDI spec file for claim: %w", err) - } - usbGatewayEnabled := featuregates.Default().USBGatewayEnabled() allocatedDevices := s.resourceClaimAllocations[claimUID] @@ -501,6 +497,11 @@ func (s *AllocationStore) Unprepare(_ context.Context, claimUID types.UID) error s.allocatedDevices.Delete(device) } + + if err := s.cdi.DeleteClaimSpecFile(string(claimUID)); err != nil { + return fmt.Errorf("unable to delete CDI spec file for claim: %w", err) + } + delete(s.resourceClaimAllocations, claimUID) return nil