From c4f32c1c51c07945cc1a612e47bc3f582cb2d776 Mon Sep 17 00:00:00 2001 From: Shreyansh Sancheti Date: Tue, 21 Apr 2026 22:55:50 +0530 Subject: [PATCH 1/2] bridge: add guest-side reconnect loop for live migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During live migration the vsock connection between the host and the GCS breaks when the VM moves to the destination node. The GCS bridge drops and cannot recover, leaving the guest unable to communicate with the new host. This adds a reconnect loop in cmd/gcs/main.go that re-dials the bridge after a connection loss. On each iteration a fresh Bridge and Mux are created while the Host state (containers, processes) persists across reconnections. A Publisher abstraction is added to bridge/publisher.go so that container wait goroutines spawned during CreateContainer can route exit notifications through the current bridge. When the bridge is down between reconnect iterations, notifications are dropped with a warning — the host-side shim re-queries container state after reconnecting. The defer ordering in ListenAndServe is fixed so that quitChan closes before responseChan becomes invalid, and responseChan is buffered to prevent PublishNotification from panicking on a dead bridge. Tested with Invoke-FullLmTestCycle on a two-node Hyper-V live migration setup (Node_1 -> Node_2). Migration completes at 100% and container exec works on the destination node after migration. Signed-off-by: Shreyansh Sancheti --- cmd/gcs/main.go | 78 +++++++++++++++---------- internal/guest/bridge/bridge.go | 73 ++++++++++++++++++++--- internal/guest/bridge/bridge_v2.go | 2 +- internal/guest/bridge/publisher.go | 56 ++++++++++++++++++ internal/guest/bridge/publisher_test.go | 46 +++++++++++++++ 5 files changed, 217 insertions(+), 38 deletions(-) create mode 100644 internal/guest/bridge/publisher.go create mode 100644 internal/guest/bridge/publisher_test.go diff --git a/cmd/gcs/main.go b/cmd/gcs/main.go index 68ab28d499..66af91873d 100644 --- a/cmd/gcs/main.go +++ b/cmd/gcs/main.go @@ -385,35 +385,23 @@ func main() { if err != nil { logrus.WithError(err).Fatal("failed to initialize new runc runtime") } - mux := bridge.NewBridgeMux() - b := bridge.Bridge{ - Handler: mux, - EnableV4: *v4, - } + h := hcsv2.NewHost(rtime, tport, initialEnforcer, logWriter) // Initialize virtual pod support in the host if err := h.InitializeVirtualPodSupport(virtualPodsControl); err != nil { logrus.WithError(err).Warn("Virtual pod support initialization failed") } - b.AssignHandlers(mux, h) - var bridgeIn io.ReadCloser - var bridgeOut io.WriteCloser - if *useInOutErr { - bridgeIn = os.Stdin - bridgeOut = os.Stdout - } else { - const commandPort uint32 = 0x40000000 - bridgeCon, err := tport.Dial(commandPort) - if err != nil { - logrus.WithFields(logrus.Fields{ - "port": commandPort, - logrus.ErrorKey: err, - }).Fatal("failed to dial host vsock connection") - } - bridgeIn = bridgeCon - bridgeOut = bridgeCon - } + const commandPort uint32 = 0x40000000 + + // Reconnect loop: on each iteration we create a fresh bridge+mux, dial the + // host, and serve until the connection drops. After a live migration the + // vsock connection breaks; we re-dial and continue. + // + // During live migration the VM is frozen and only wakes up when the host + // shim is ready, so the vsock port should be immediately available. We + // use a tight retry interval instead of exponential backoff. + const reconnectInterval = 100 * time.Millisecond event := cgroups1.MemoryThresholdEvent(*gcsMemLimitBytes, false) gefd, err := gcsControl.RegisterMemoryEvent(event) @@ -430,7 +418,6 @@ func main() { oomFile := os.NewFile(oom, "cefd") defer oomFile.Close() - // Setup OOM monitoring for virtual-pods cgroup virtualPodsOom, err := virtualPodsControl.OOMEventFD() if err != nil { logrus.WithError(err).Fatal("failed to retrieve the virtual-pods cgroups oom eventfd") @@ -438,7 +425,6 @@ func main() { virtualPodsOomFile := os.NewFile(virtualPodsOom, "vp-oomfd") defer virtualPodsOomFile.Close() - // time synchronization service if !(*disableTimeSync) { if err = startTimeSyncService(); err != nil { logrus.WithError(err).Fatal("failed to start time synchronization service") @@ -448,10 +434,42 @@ func main() { go readMemoryEvents(startTime, gefdFile, "/gcs", int64(*gcsMemLimitBytes), gcsControl) go readMemoryEvents(startTime, oomFile, "/containers", containersLimit, containersControl) go readMemoryEvents(startTime, virtualPodsOomFile, "/containers/virtual-pods", containersLimit, virtualPodsControl) - err = b.ListenAndServe(bridgeIn, bridgeOut) - if err != nil { - logrus.WithFields(logrus.Fields{ - logrus.ErrorKey: err, - }).Fatal("failed to serve gcs service") + + mux := bridge.NewBridgeMux() + b := bridge.New(mux, *v4) + b.AssignHandlers(mux, h) + + // Reconnect loop: dial the host, serve until the connection drops, then + // re-dial. During live migration the VM is frozen and only wakes up when + // the destination host shim is ready, so the vsock port should be + // immediately available. + for { + var bridgeIn io.ReadCloser + var bridgeOut io.WriteCloser + if *useInOutErr { + bridgeIn = os.Stdin + bridgeOut = os.Stdout + } else { + bridgeCon, dialErr := tport.Dial(commandPort) + if dialErr != nil { + logrus.WithError(dialErr).Warn("failed to dial host, retrying") + time.Sleep(reconnectInterval) + continue + } + bridgeIn = bridgeCon + bridgeOut = bridgeCon + } + + logrus.Info("bridge connected, serving") + + serveErr := b.ListenAndServe(bridgeIn, bridgeOut) + + if b.ShutdownRequested() { + logrus.Info("bridge shutdown requested, exiting reconnect loop") + break + } + + logrus.WithError(serveErr).Warn("bridge connection lost, will reconnect") + time.Sleep(reconnectInterval) } } diff --git a/internal/guest/bridge/bridge.go b/internal/guest/bridge/bridge.go index 4ea03ed104..34d133f529 100644 --- a/internal/guest/bridge/bridge.go +++ b/internal/guest/bridge/bridge.go @@ -189,6 +189,43 @@ type Bridge struct { hasQuitPending atomic.Bool protVer prot.ProtocolVersion + + // publisher is a stable notification sink that survives bridge recreation + // during live migration. Notifications are queued when the bridge is down + // and drained when it reconnects. + publisher *publisher +} + +// responseChanBuffer is the buffer size for the bridge response channel. +const responseChanBuffer = 16 + +// New creates a Bridge with a notification publisher that queues container +// queues container exit notifications when the bridge is disconnected and +// drains them when it reconnects. +func New(handler Handler, enableV4 bool) *Bridge { + return &Bridge{ + Handler: handler, + EnableV4: enableV4, + publisher: newPublisher(), + } +} + +// Connect attaches the publisher to this bridge so that container exit +// notifications are delivered. Any notifications queued while disconnected +// are drained immediately. +func (b *Bridge) Connect() { + b.publisher.setBridge(b) +} + +// Disconnect detaches the publisher. Notifications arriving while +// disconnected are queued until the next Connect. +func (b *Bridge) Disconnect() { + b.publisher.setBridge(nil) +} + +// ShutdownRequested returns true if the bridge has been asked to shut down. +func (b *Bridge) ShutdownRequested() bool { + return b.hasQuitPending.Load() } // AssignHandlers creates and assigns the appropriate bridge @@ -226,17 +263,26 @@ func (b *Bridge) AssignHandlers(mux *Mux, host *hcsv2.Host) { func (b *Bridge) ListenAndServe(bridgeIn io.ReadCloser, bridgeOut io.WriteCloser) error { requestChan := make(chan *Request) requestErrChan := make(chan error) - b.responseChan = make(chan bridgeResponse) + b.responseChan = make(chan bridgeResponse, responseChanBuffer) responseErrChan := make(chan error) b.quitChan = make(chan bool) - defer close(b.quitChan) + // Attach the publisher now that responseChan is valid. Queued + // notifications from a previous connection are drained here. + b.publisher.setBridge(b) + + // Defers execute in LIFO order. quitChan is deferred last so it + // closes first, letting PublishNotification's select see it before + // responseChan becomes invalid. responseChan is not explicitly closed; + // the response writer goroutine exits when the bridge connection drops. + defer bridgeIn.Close() + defer close(requestErrChan) + defer close(requestChan) defer bridgeOut.Close() defer close(responseErrChan) - defer close(b.responseChan) - defer close(requestChan) - defer close(requestErrChan) - defer bridgeIn.Close() + defer b.publisher.setBridge(nil) + defer close(b.quitChan) + defer close(b.quitChan) // Receive bridge requests and schedule them to be processed. go func() { @@ -440,7 +486,20 @@ func (b *Bridge) PublishNotification(n *prot.ContainerNotification) { }, response: n, } - b.responseChan <- resp + // Check quitChan first to avoid sending to a dead bridge. + select { + case <-b.quitChan: + logrus.WithField("containerID", n.ContainerID). + Warn("bridge quit, dropping notification") + return + default: + } + select { + case b.responseChan <- resp: + case <-b.quitChan: + logrus.WithField("containerID", n.ContainerID). + Warn("bridge quit, dropping notification") + } } // setErrorForResponseBase modifies the passed-in MessageResponseBase to diff --git a/internal/guest/bridge/bridge_v2.go b/internal/guest/bridge/bridge_v2.go index 800094e549..1789a7b53f 100644 --- a/internal/guest/bridge/bridge_v2.go +++ b/internal/guest/bridge/bridge_v2.go @@ -123,7 +123,7 @@ func (b *Bridge) createContainerV2(r *Request) (_ RequestResponse, err error) { Result: 0, ResultInfo: "", } - b.PublishNotification(notification) + b.publisher.publish(notification) }() return &prot.ContainerCreateResponse{}, nil diff --git a/internal/guest/bridge/publisher.go b/internal/guest/bridge/publisher.go new file mode 100644 index 0000000000..755e78f7e7 --- /dev/null +++ b/internal/guest/bridge/publisher.go @@ -0,0 +1,56 @@ +//go:build linux + +package bridge + +import ( + "sync" + + "github.com/Microsoft/hcsshim/internal/guest/prot" + "github.com/sirupsen/logrus" +) + +// publisher provides a stable reference for container exit goroutines +// to send notifications through. It survives bridge recreation during +// live migration. When the bridge is nil, notifications are queued and +// drained when a new bridge is attached. +type publisher struct { + mu sync.Mutex + b *Bridge + pending []*prot.ContainerNotification +} + +// newPublisher creates a publisher with no bridge attached. +func newPublisher() *publisher { + return &publisher{} +} + +// setBridge attaches or detaches the current bridge. When a non-nil bridge +// is set, any queued notifications are drained through it immediately. +func (p *publisher) setBridge(b *Bridge) { + p.mu.Lock() + defer p.mu.Unlock() + p.b = b + if b != nil { + for _, n := range p.pending { + logrus.WithField("containerID", n.ContainerID). + Info("draining queued container notification") + b.PublishNotification(n) + } + p.pending = nil + } +} + +// publish sends a container notification to the current bridge. +// If no bridge is connected, the notification is queued for delivery +// when the next bridge is set. +func (p *publisher) publish(n *prot.ContainerNotification) { + p.mu.Lock() + defer p.mu.Unlock() + if p.b == nil { + logrus.WithField("containerID", n.ContainerID). + Warn("bridge not connected, queueing container notification") + p.pending = append(p.pending, n) + return + } + p.b.PublishNotification(n) +} diff --git a/internal/guest/bridge/publisher_test.go b/internal/guest/bridge/publisher_test.go new file mode 100644 index 0000000000..dbee1cdac0 --- /dev/null +++ b/internal/guest/bridge/publisher_test.go @@ -0,0 +1,46 @@ +//go:build linux + +package bridge + +import ( + "testing" + + "github.com/Microsoft/hcsshim/internal/guest/prot" +) + +func TestPublisher_NilBridgeQueues(t *testing.T) { + p := newPublisher() + p.publish(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "test"}, + }) + p.mu.Lock() + if len(p.pending) != 1 { + t.Fatalf("expected 1 queued notification, got %d", len(p.pending)) + } + p.mu.Unlock() +} + +func TestPublisher_SetBridgeDrains(t *testing.T) { + p := newPublisher() + // Queue two notifications while disconnected. + p.publish(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "c1"}, + }) + p.publish(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "c2"}, + }) + + p.mu.Lock() + if len(p.pending) != 2 { + t.Fatalf("expected 2 queued, got %d", len(p.pending)) + } + p.mu.Unlock() + + // setBridge(nil) should not drain. + p.setBridge(nil) + p.mu.Lock() + if len(p.pending) != 2 { + t.Fatalf("expected 2 still queued after setBridge(nil), got %d", len(p.pending)) + } + p.mu.Unlock() +} From 30d3ca40bdf0d75b2d3745e2f95a55b31f6f6bec Mon Sep 17 00:00:00 2001 From: Shreyansh Sancheti Date: Mon, 27 Apr 2026 20:59:46 +0530 Subject: [PATCH 2/2] bridge: inline publisher into Bridge, add reconnect unit tests - Inline publisher fields (notifyMu, connected, pendingNotifications) and methods (publishNotification, drainPendingNotifications, disconnectNotifications) directly into the Bridge struct, removing the separate publisher.go file. - Simplify PublishNotification to a direct channel send now that the publish/disconnect pattern ensures safe access. - Add LinuxGcsVsockPort constant to internal/guest/prot for use by the GCS reconnect loop (cannot import from internal/gcs/prot due to windows build tag). - Add 4 unit tests covering notification queuing, drain-on-reconnect, disconnect-after-drain, and full reconnect cycle. - Remove exported Connect/Disconnect methods (now handled internally by ListenAndServe lifecycle). Signed-off-by: Shreyansh Sancheti --- cmd/gcs/main.go | 11 +- internal/guest/bridge/bridge.go | 107 +++++++----- .../guest/bridge/bridge_reconnect_test.go | 165 ++++++++++++++++++ internal/guest/bridge/bridge_v2.go | 2 +- internal/guest/bridge/publisher.go | 56 ------ internal/guest/bridge/publisher_test.go | 46 ----- internal/guest/prot/protocol.go | 5 + 7 files changed, 235 insertions(+), 157 deletions(-) create mode 100644 internal/guest/bridge/bridge_reconnect_test.go delete mode 100644 internal/guest/bridge/publisher.go delete mode 100644 internal/guest/bridge/publisher_test.go diff --git a/cmd/gcs/main.go b/cmd/gcs/main.go index 66af91873d..c21ea167fa 100644 --- a/cmd/gcs/main.go +++ b/cmd/gcs/main.go @@ -24,6 +24,7 @@ import ( "github.com/Microsoft/hcsshim/internal/guest/bridge" "github.com/Microsoft/hcsshim/internal/guest/cgroup" "github.com/Microsoft/hcsshim/internal/guest/kmsg" + "github.com/Microsoft/hcsshim/internal/guest/prot" "github.com/Microsoft/hcsshim/internal/guest/runtime/hcsv2" "github.com/Microsoft/hcsshim/internal/guest/runtime/runc" "github.com/Microsoft/hcsshim/internal/guest/transport" @@ -392,12 +393,6 @@ func main() { logrus.WithError(err).Warn("Virtual pod support initialization failed") } - const commandPort uint32 = 0x40000000 - - // Reconnect loop: on each iteration we create a fresh bridge+mux, dial the - // host, and serve until the connection drops. After a live migration the - // vsock connection breaks; we re-dial and continue. - // // During live migration the VM is frozen and only wakes up when the host // shim is ready, so the vsock port should be immediately available. We // use a tight retry interval instead of exponential backoff. @@ -418,6 +413,7 @@ func main() { oomFile := os.NewFile(oom, "cefd") defer oomFile.Close() + // Setup OOM monitoring for virtual-pods cgroup virtualPodsOom, err := virtualPodsControl.OOMEventFD() if err != nil { logrus.WithError(err).Fatal("failed to retrieve the virtual-pods cgroups oom eventfd") @@ -425,6 +421,7 @@ func main() { virtualPodsOomFile := os.NewFile(virtualPodsOom, "vp-oomfd") defer virtualPodsOomFile.Close() + // time synchronization service if !(*disableTimeSync) { if err = startTimeSyncService(); err != nil { logrus.WithError(err).Fatal("failed to start time synchronization service") @@ -450,7 +447,7 @@ func main() { bridgeIn = os.Stdin bridgeOut = os.Stdout } else { - bridgeCon, dialErr := tport.Dial(commandPort) + bridgeCon, dialErr := tport.Dial(prot.LinuxGcsVsockPort) if dialErr != nil { logrus.WithError(dialErr).Warn("failed to dial host, retrying") time.Sleep(reconnectInterval) diff --git a/internal/guest/bridge/bridge.go b/internal/guest/bridge/bridge.go index 34d133f529..d8d57e786d 100644 --- a/internal/guest/bridge/bridge.go +++ b/internal/guest/bridge/bridge.go @@ -190,37 +190,65 @@ type Bridge struct { protVer prot.ProtocolVersion - // publisher is a stable notification sink that survives bridge recreation - // during live migration. Notifications are queued when the bridge is down - // and drained when it reconnects. - publisher *publisher + // notifyMu guards connected and pendingNotifications. + notifyMu sync.Mutex + // connected is true while ListenAndServe is running and responseChan + // is valid. When false, notifications are queued in pendingNotifications. + connected bool + // pendingNotifications holds container exit notifications that arrived + // while the bridge was disconnected. They are drained when ListenAndServe + // reconnects. + pendingNotifications []*prot.ContainerNotification } -// responseChanBuffer is the buffer size for the bridge response channel. -const responseChanBuffer = 16 - -// New creates a Bridge with a notification publisher that queues container -// queues container exit notifications when the bridge is disconnected and -// drains them when it reconnects. +// New creates a Bridge. Container exit notifications are queued while +// disconnected and drained on reconnect. func New(handler Handler, enableV4 bool) *Bridge { return &Bridge{ - Handler: handler, - EnableV4: enableV4, - publisher: newPublisher(), + Handler: handler, + EnableV4: enableV4, + } +} + +// publishNotification routes a container exit notification to the bridge. +// If the bridge is disconnected, the notification is queued and will be +// drained when ListenAndServe reconnects. +func (b *Bridge) publishNotification(n *prot.ContainerNotification) { + b.notifyMu.Lock() + if !b.connected { + logrus.WithField("containerID", n.ContainerID). + Warn("bridge not connected, queueing container notification") + b.pendingNotifications = append(b.pendingNotifications, n) + b.notifyMu.Unlock() + return } + b.notifyMu.Unlock() + b.PublishNotification(n) } -// Connect attaches the publisher to this bridge so that container exit -// notifications are delivered. Any notifications queued while disconnected -// are drained immediately. -func (b *Bridge) Connect() { - b.publisher.setBridge(b) +// drainPendingNotifications sends any queued notifications through the bridge. +// Must be called after responseChan is valid and the response writer goroutine +// is running. +func (b *Bridge) drainPendingNotifications() { + b.notifyMu.Lock() + b.connected = true + drain := b.pendingNotifications + b.pendingNotifications = nil + b.notifyMu.Unlock() + + for _, n := range drain { + logrus.WithField("containerID", n.ContainerID). + Info("draining queued container notification") + b.PublishNotification(n) + } } -// Disconnect detaches the publisher. Notifications arriving while -// disconnected are queued until the next Connect. -func (b *Bridge) Disconnect() { - b.publisher.setBridge(nil) +// disconnectNotifications marks the bridge as disconnected so future +// publishNotification calls queue instead of sending. +func (b *Bridge) disconnectNotifications() { + b.notifyMu.Lock() + b.connected = false + b.notifyMu.Unlock() } // ShutdownRequested returns true if the bridge has been asked to shut down. @@ -263,25 +291,20 @@ func (b *Bridge) AssignHandlers(mux *Mux, host *hcsv2.Host) { func (b *Bridge) ListenAndServe(bridgeIn io.ReadCloser, bridgeOut io.WriteCloser) error { requestChan := make(chan *Request) requestErrChan := make(chan error) - b.responseChan = make(chan bridgeResponse, responseChanBuffer) + b.responseChan = make(chan bridgeResponse) responseErrChan := make(chan error) b.quitChan = make(chan bool) - // Attach the publisher now that responseChan is valid. Queued - // notifications from a previous connection are drained here. - b.publisher.setBridge(b) - - // Defers execute in LIFO order. quitChan is deferred last so it - // closes first, letting PublishNotification's select see it before - // responseChan becomes invalid. responseChan is not explicitly closed; - // the response writer goroutine exits when the bridge connection drops. + // Defers execute in LIFO order. We close quitChan first to stop the + // read loop, then disconnect notifications (so any late publishNotification + // calls queue instead of sending to a closing responseChan), then tear + // down the remaining channels and connections. defer bridgeIn.Close() defer close(requestErrChan) defer close(requestChan) defer bridgeOut.Close() defer close(responseErrChan) - defer b.publisher.setBridge(nil) - defer close(b.quitChan) + defer b.disconnectNotifications() defer close(b.quitChan) // Receive bridge requests and schedule them to be processed. @@ -442,6 +465,9 @@ func (b *Bridge) ListenAndServe(bridgeIn io.ReadCloser, bridgeOut io.WriteCloser responseErrChan <- resperr }() + // Drain queued notifications now that the response writer is running. + b.drainPendingNotifications() + select { case err := <-requestErrChan: return err @@ -486,20 +512,7 @@ func (b *Bridge) PublishNotification(n *prot.ContainerNotification) { }, response: n, } - // Check quitChan first to avoid sending to a dead bridge. - select { - case <-b.quitChan: - logrus.WithField("containerID", n.ContainerID). - Warn("bridge quit, dropping notification") - return - default: - } - select { - case b.responseChan <- resp: - case <-b.quitChan: - logrus.WithField("containerID", n.ContainerID). - Warn("bridge quit, dropping notification") - } + b.responseChan <- resp } // setErrorForResponseBase modifies the passed-in MessageResponseBase to diff --git a/internal/guest/bridge/bridge_reconnect_test.go b/internal/guest/bridge/bridge_reconnect_test.go new file mode 100644 index 0000000000..2a8c8b702a --- /dev/null +++ b/internal/guest/bridge/bridge_reconnect_test.go @@ -0,0 +1,165 @@ +//go:build linux + +package bridge + +import ( + "io" + "testing" + "time" + + "github.com/Microsoft/hcsshim/internal/guest/prot" +) + +func TestBridge_NotificationQueuedWhenDisconnected(t *testing.T) { + b := New(nil, false) + // Bridge starts disconnected (connected == false). + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "c1"}, + }) + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "c2"}, + }) + + b.notifyMu.Lock() + if len(b.pendingNotifications) != 2 { + t.Fatalf("expected 2 queued, got %d", len(b.pendingNotifications)) + } + b.notifyMu.Unlock() +} + +func TestBridge_DrainOnReconnect(t *testing.T) { + b := New(nil, false) + + // Queue notifications while disconnected. + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "c1"}, + }) + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "c2"}, + }) + + // Simulate what ListenAndServe does: create channels, start writer, + // then drain. + b.responseChan = make(chan bridgeResponse, 4) + + b.drainPendingNotifications() + + // Collect drained notifications. + var ids []string + for i := 0; i < 2; i++ { + select { + case resp := <-b.responseChan: + n := resp.response.(*prot.ContainerNotification) + ids = append(ids, n.ContainerID) + case <-time.After(time.Second): + t.Fatalf("timed out waiting for notification %d", i) + } + } + if len(ids) != 2 { + t.Fatalf("expected 2 drained notifications, got %d", len(ids)) + } + + b.notifyMu.Lock() + if len(b.pendingNotifications) != 0 { + t.Fatalf("expected 0 pending after drain, got %d", len(b.pendingNotifications)) + } + b.notifyMu.Unlock() +} + +func TestBridge_DisconnectQueuesAfterDrain(t *testing.T) { + b := New(nil, false) + b.responseChan = make(chan bridgeResponse, 4) + + // Drain with nothing pending — just sets connected = true. + b.drainPendingNotifications() + + // Send while connected — goes directly to responseChan. + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "live"}, + }) + + select { + case resp := <-b.responseChan: + n := resp.response.(*prot.ContainerNotification) + if n.ContainerID != "live" { + t.Fatalf("expected 'live', got %q", n.ContainerID) + } + default: + t.Fatal("expected notification on responseChan") + } + + // Disconnect — future notifications should queue. + b.disconnectNotifications() + + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "queued"}, + }) + + b.notifyMu.Lock() + if len(b.pendingNotifications) != 1 { + t.Fatalf("expected 1 queued after disconnect, got %d", len(b.pendingNotifications)) + } + b.notifyMu.Unlock() + + // Nothing should be on responseChan. + select { + case <-b.responseChan: + t.Fatal("should not have received on responseChan after disconnect") + default: + } +} + +func TestBridge_FullReconnectCycle(t *testing.T) { + b := New(nil, false) + + // --- Iteration 1: simulate ListenAndServe --- + r1, w1 := io.Pipe() + b.responseChan = make(chan bridgeResponse, 4) + b.quitChan = make(chan bool) + + go func() { + for range b.responseChan { + } + }() // drain writer + + b.drainPendingNotifications() + + // Send a notification while connected. + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "iter1"}, + }) + + // Simulate bridge drop — disconnect, close channels. + b.disconnectNotifications() + close(b.quitChan) + close(b.responseChan) + r1.Close() + w1.Close() + + // --- Between iterations: container exits --- + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "between"}, + }) + + b.notifyMu.Lock() + if len(b.pendingNotifications) != 1 || b.pendingNotifications[0].ContainerID != "between" { + t.Fatalf("expected 'between' queued, got %v", b.pendingNotifications) + } + b.notifyMu.Unlock() + + // --- Iteration 2: reconnect --- + b.responseChan = make(chan bridgeResponse, 4) + b.quitChan = make(chan bool) + + b.drainPendingNotifications() + + select { + case resp := <-b.responseChan: + n := resp.response.(*prot.ContainerNotification) + if n.ContainerID != "between" { + t.Fatalf("expected 'between', got %q", n.ContainerID) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for drained notification") + } +} diff --git a/internal/guest/bridge/bridge_v2.go b/internal/guest/bridge/bridge_v2.go index 1789a7b53f..dc13ec6428 100644 --- a/internal/guest/bridge/bridge_v2.go +++ b/internal/guest/bridge/bridge_v2.go @@ -123,7 +123,7 @@ func (b *Bridge) createContainerV2(r *Request) (_ RequestResponse, err error) { Result: 0, ResultInfo: "", } - b.publisher.publish(notification) + b.publishNotification(notification) }() return &prot.ContainerCreateResponse{}, nil diff --git a/internal/guest/bridge/publisher.go b/internal/guest/bridge/publisher.go deleted file mode 100644 index 755e78f7e7..0000000000 --- a/internal/guest/bridge/publisher.go +++ /dev/null @@ -1,56 +0,0 @@ -//go:build linux - -package bridge - -import ( - "sync" - - "github.com/Microsoft/hcsshim/internal/guest/prot" - "github.com/sirupsen/logrus" -) - -// publisher provides a stable reference for container exit goroutines -// to send notifications through. It survives bridge recreation during -// live migration. When the bridge is nil, notifications are queued and -// drained when a new bridge is attached. -type publisher struct { - mu sync.Mutex - b *Bridge - pending []*prot.ContainerNotification -} - -// newPublisher creates a publisher with no bridge attached. -func newPublisher() *publisher { - return &publisher{} -} - -// setBridge attaches or detaches the current bridge. When a non-nil bridge -// is set, any queued notifications are drained through it immediately. -func (p *publisher) setBridge(b *Bridge) { - p.mu.Lock() - defer p.mu.Unlock() - p.b = b - if b != nil { - for _, n := range p.pending { - logrus.WithField("containerID", n.ContainerID). - Info("draining queued container notification") - b.PublishNotification(n) - } - p.pending = nil - } -} - -// publish sends a container notification to the current bridge. -// If no bridge is connected, the notification is queued for delivery -// when the next bridge is set. -func (p *publisher) publish(n *prot.ContainerNotification) { - p.mu.Lock() - defer p.mu.Unlock() - if p.b == nil { - logrus.WithField("containerID", n.ContainerID). - Warn("bridge not connected, queueing container notification") - p.pending = append(p.pending, n) - return - } - p.b.PublishNotification(n) -} diff --git a/internal/guest/bridge/publisher_test.go b/internal/guest/bridge/publisher_test.go deleted file mode 100644 index dbee1cdac0..0000000000 --- a/internal/guest/bridge/publisher_test.go +++ /dev/null @@ -1,46 +0,0 @@ -//go:build linux - -package bridge - -import ( - "testing" - - "github.com/Microsoft/hcsshim/internal/guest/prot" -) - -func TestPublisher_NilBridgeQueues(t *testing.T) { - p := newPublisher() - p.publish(&prot.ContainerNotification{ - MessageBase: prot.MessageBase{ContainerID: "test"}, - }) - p.mu.Lock() - if len(p.pending) != 1 { - t.Fatalf("expected 1 queued notification, got %d", len(p.pending)) - } - p.mu.Unlock() -} - -func TestPublisher_SetBridgeDrains(t *testing.T) { - p := newPublisher() - // Queue two notifications while disconnected. - p.publish(&prot.ContainerNotification{ - MessageBase: prot.MessageBase{ContainerID: "c1"}, - }) - p.publish(&prot.ContainerNotification{ - MessageBase: prot.MessageBase{ContainerID: "c2"}, - }) - - p.mu.Lock() - if len(p.pending) != 2 { - t.Fatalf("expected 2 queued, got %d", len(p.pending)) - } - p.mu.Unlock() - - // setBridge(nil) should not drain. - p.setBridge(nil) - p.mu.Lock() - if len(p.pending) != 2 { - t.Fatalf("expected 2 still queued after setBridge(nil), got %d", len(p.pending)) - } - p.mu.Unlock() -} diff --git a/internal/guest/prot/protocol.go b/internal/guest/prot/protocol.go index 16e1f9daa3..ef88bd278e 100644 --- a/internal/guest/prot/protocol.go +++ b/internal/guest/prot/protocol.go @@ -39,6 +39,11 @@ const ( messageVersionShift = 0 ) +const ( + // LinuxGcsVsockPort is the vsock port the GCS connects to on the host. + LinuxGcsVsockPort uint32 = 0x40000000 +) + // MessageType is the type of the message. type MessageType uint32