diff --git a/cmd/gcs/main.go b/cmd/gcs/main.go index 68ab28d499..c21ea167fa 100644 --- a/cmd/gcs/main.go +++ b/cmd/gcs/main.go @@ -24,6 +24,7 @@ import ( "github.com/Microsoft/hcsshim/internal/guest/bridge" "github.com/Microsoft/hcsshim/internal/guest/cgroup" "github.com/Microsoft/hcsshim/internal/guest/kmsg" + "github.com/Microsoft/hcsshim/internal/guest/prot" "github.com/Microsoft/hcsshim/internal/guest/runtime/hcsv2" "github.com/Microsoft/hcsshim/internal/guest/runtime/runc" "github.com/Microsoft/hcsshim/internal/guest/transport" @@ -385,35 +386,17 @@ func main() { if err != nil { logrus.WithError(err).Fatal("failed to initialize new runc runtime") } - mux := bridge.NewBridgeMux() - b := bridge.Bridge{ - Handler: mux, - EnableV4: *v4, - } + h := hcsv2.NewHost(rtime, tport, initialEnforcer, logWriter) // Initialize virtual pod support in the host if err := h.InitializeVirtualPodSupport(virtualPodsControl); err != nil { logrus.WithError(err).Warn("Virtual pod support initialization failed") } - b.AssignHandlers(mux, h) - var bridgeIn io.ReadCloser - var bridgeOut io.WriteCloser - if *useInOutErr { - bridgeIn = os.Stdin - bridgeOut = os.Stdout - } else { - const commandPort uint32 = 0x40000000 - bridgeCon, err := tport.Dial(commandPort) - if err != nil { - logrus.WithFields(logrus.Fields{ - "port": commandPort, - logrus.ErrorKey: err, - }).Fatal("failed to dial host vsock connection") - } - bridgeIn = bridgeCon - bridgeOut = bridgeCon - } + // During live migration the VM is frozen and only wakes up when the host + // shim is ready, so the vsock port should be immediately available. We + // use a tight retry interval instead of exponential backoff. + const reconnectInterval = 100 * time.Millisecond event := cgroups1.MemoryThresholdEvent(*gcsMemLimitBytes, false) gefd, err := gcsControl.RegisterMemoryEvent(event) @@ -448,10 +431,42 @@ func main() { go readMemoryEvents(startTime, gefdFile, "/gcs", int64(*gcsMemLimitBytes), gcsControl) go readMemoryEvents(startTime, oomFile, "/containers", containersLimit, containersControl) go readMemoryEvents(startTime, virtualPodsOomFile, "/containers/virtual-pods", containersLimit, virtualPodsControl) - err = b.ListenAndServe(bridgeIn, bridgeOut) - if err != nil { - logrus.WithFields(logrus.Fields{ - logrus.ErrorKey: err, - }).Fatal("failed to serve gcs service") + + mux := bridge.NewBridgeMux() + b := bridge.New(mux, *v4) + b.AssignHandlers(mux, h) + + // Reconnect loop: dial the host, serve until the connection drops, then + // re-dial. During live migration the VM is frozen and only wakes up when + // the destination host shim is ready, so the vsock port should be + // immediately available. + for { + var bridgeIn io.ReadCloser + var bridgeOut io.WriteCloser + if *useInOutErr { + bridgeIn = os.Stdin + bridgeOut = os.Stdout + } else { + bridgeCon, dialErr := tport.Dial(prot.LinuxGcsVsockPort) + if dialErr != nil { + logrus.WithError(dialErr).Warn("failed to dial host, retrying") + time.Sleep(reconnectInterval) + continue + } + bridgeIn = bridgeCon + bridgeOut = bridgeCon + } + + logrus.Info("bridge connected, serving") + + serveErr := b.ListenAndServe(bridgeIn, bridgeOut) + + if b.ShutdownRequested() { + logrus.Info("bridge shutdown requested, exiting reconnect loop") + break + } + + logrus.WithError(serveErr).Warn("bridge connection lost, will reconnect") + time.Sleep(reconnectInterval) } } diff --git a/internal/guest/bridge/bridge.go b/internal/guest/bridge/bridge.go index 4ea03ed104..d8d57e786d 100644 --- a/internal/guest/bridge/bridge.go +++ b/internal/guest/bridge/bridge.go @@ -189,6 +189,71 @@ type Bridge struct { hasQuitPending atomic.Bool protVer prot.ProtocolVersion + + // notifyMu guards connected and pendingNotifications. + notifyMu sync.Mutex + // connected is true while ListenAndServe is running and responseChan + // is valid. When false, notifications are queued in pendingNotifications. + connected bool + // pendingNotifications holds container exit notifications that arrived + // while the bridge was disconnected. They are drained when ListenAndServe + // reconnects. + pendingNotifications []*prot.ContainerNotification +} + +// New creates a Bridge. Container exit notifications are queued while +// disconnected and drained on reconnect. +func New(handler Handler, enableV4 bool) *Bridge { + return &Bridge{ + Handler: handler, + EnableV4: enableV4, + } +} + +// publishNotification routes a container exit notification to the bridge. +// If the bridge is disconnected, the notification is queued and will be +// drained when ListenAndServe reconnects. +func (b *Bridge) publishNotification(n *prot.ContainerNotification) { + b.notifyMu.Lock() + if !b.connected { + logrus.WithField("containerID", n.ContainerID). + Warn("bridge not connected, queueing container notification") + b.pendingNotifications = append(b.pendingNotifications, n) + b.notifyMu.Unlock() + return + } + b.notifyMu.Unlock() + b.PublishNotification(n) +} + +// drainPendingNotifications sends any queued notifications through the bridge. +// Must be called after responseChan is valid and the response writer goroutine +// is running. +func (b *Bridge) drainPendingNotifications() { + b.notifyMu.Lock() + b.connected = true + drain := b.pendingNotifications + b.pendingNotifications = nil + b.notifyMu.Unlock() + + for _, n := range drain { + logrus.WithField("containerID", n.ContainerID). + Info("draining queued container notification") + b.PublishNotification(n) + } +} + +// disconnectNotifications marks the bridge as disconnected so future +// publishNotification calls queue instead of sending. +func (b *Bridge) disconnectNotifications() { + b.notifyMu.Lock() + b.connected = false + b.notifyMu.Unlock() +} + +// ShutdownRequested returns true if the bridge has been asked to shut down. +func (b *Bridge) ShutdownRequested() bool { + return b.hasQuitPending.Load() } // AssignHandlers creates and assigns the appropriate bridge @@ -230,13 +295,17 @@ func (b *Bridge) ListenAndServe(bridgeIn io.ReadCloser, bridgeOut io.WriteCloser responseErrChan := make(chan error) b.quitChan = make(chan bool) - defer close(b.quitChan) + // Defers execute in LIFO order. We close quitChan first to stop the + // read loop, then disconnect notifications (so any late publishNotification + // calls queue instead of sending to a closing responseChan), then tear + // down the remaining channels and connections. + defer bridgeIn.Close() + defer close(requestErrChan) + defer close(requestChan) defer bridgeOut.Close() defer close(responseErrChan) - defer close(b.responseChan) - defer close(requestChan) - defer close(requestErrChan) - defer bridgeIn.Close() + defer b.disconnectNotifications() + defer close(b.quitChan) // Receive bridge requests and schedule them to be processed. go func() { @@ -396,6 +465,9 @@ func (b *Bridge) ListenAndServe(bridgeIn io.ReadCloser, bridgeOut io.WriteCloser responseErrChan <- resperr }() + // Drain queued notifications now that the response writer is running. + b.drainPendingNotifications() + select { case err := <-requestErrChan: return err diff --git a/internal/guest/bridge/bridge_reconnect_test.go b/internal/guest/bridge/bridge_reconnect_test.go new file mode 100644 index 0000000000..2a8c8b702a --- /dev/null +++ b/internal/guest/bridge/bridge_reconnect_test.go @@ -0,0 +1,165 @@ +//go:build linux + +package bridge + +import ( + "io" + "testing" + "time" + + "github.com/Microsoft/hcsshim/internal/guest/prot" +) + +func TestBridge_NotificationQueuedWhenDisconnected(t *testing.T) { + b := New(nil, false) + // Bridge starts disconnected (connected == false). + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "c1"}, + }) + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "c2"}, + }) + + b.notifyMu.Lock() + if len(b.pendingNotifications) != 2 { + t.Fatalf("expected 2 queued, got %d", len(b.pendingNotifications)) + } + b.notifyMu.Unlock() +} + +func TestBridge_DrainOnReconnect(t *testing.T) { + b := New(nil, false) + + // Queue notifications while disconnected. + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "c1"}, + }) + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "c2"}, + }) + + // Simulate what ListenAndServe does: create channels, start writer, + // then drain. + b.responseChan = make(chan bridgeResponse, 4) + + b.drainPendingNotifications() + + // Collect drained notifications. + var ids []string + for i := 0; i < 2; i++ { + select { + case resp := <-b.responseChan: + n := resp.response.(*prot.ContainerNotification) + ids = append(ids, n.ContainerID) + case <-time.After(time.Second): + t.Fatalf("timed out waiting for notification %d", i) + } + } + if len(ids) != 2 { + t.Fatalf("expected 2 drained notifications, got %d", len(ids)) + } + + b.notifyMu.Lock() + if len(b.pendingNotifications) != 0 { + t.Fatalf("expected 0 pending after drain, got %d", len(b.pendingNotifications)) + } + b.notifyMu.Unlock() +} + +func TestBridge_DisconnectQueuesAfterDrain(t *testing.T) { + b := New(nil, false) + b.responseChan = make(chan bridgeResponse, 4) + + // Drain with nothing pending — just sets connected = true. + b.drainPendingNotifications() + + // Send while connected — goes directly to responseChan. + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "live"}, + }) + + select { + case resp := <-b.responseChan: + n := resp.response.(*prot.ContainerNotification) + if n.ContainerID != "live" { + t.Fatalf("expected 'live', got %q", n.ContainerID) + } + default: + t.Fatal("expected notification on responseChan") + } + + // Disconnect — future notifications should queue. + b.disconnectNotifications() + + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "queued"}, + }) + + b.notifyMu.Lock() + if len(b.pendingNotifications) != 1 { + t.Fatalf("expected 1 queued after disconnect, got %d", len(b.pendingNotifications)) + } + b.notifyMu.Unlock() + + // Nothing should be on responseChan. + select { + case <-b.responseChan: + t.Fatal("should not have received on responseChan after disconnect") + default: + } +} + +func TestBridge_FullReconnectCycle(t *testing.T) { + b := New(nil, false) + + // --- Iteration 1: simulate ListenAndServe --- + r1, w1 := io.Pipe() + b.responseChan = make(chan bridgeResponse, 4) + b.quitChan = make(chan bool) + + go func() { + for range b.responseChan { + } + }() // drain writer + + b.drainPendingNotifications() + + // Send a notification while connected. + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "iter1"}, + }) + + // Simulate bridge drop — disconnect, close channels. + b.disconnectNotifications() + close(b.quitChan) + close(b.responseChan) + r1.Close() + w1.Close() + + // --- Between iterations: container exits --- + b.publishNotification(&prot.ContainerNotification{ + MessageBase: prot.MessageBase{ContainerID: "between"}, + }) + + b.notifyMu.Lock() + if len(b.pendingNotifications) != 1 || b.pendingNotifications[0].ContainerID != "between" { + t.Fatalf("expected 'between' queued, got %v", b.pendingNotifications) + } + b.notifyMu.Unlock() + + // --- Iteration 2: reconnect --- + b.responseChan = make(chan bridgeResponse, 4) + b.quitChan = make(chan bool) + + b.drainPendingNotifications() + + select { + case resp := <-b.responseChan: + n := resp.response.(*prot.ContainerNotification) + if n.ContainerID != "between" { + t.Fatalf("expected 'between', got %q", n.ContainerID) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for drained notification") + } +} diff --git a/internal/guest/bridge/bridge_v2.go b/internal/guest/bridge/bridge_v2.go index 800094e549..dc13ec6428 100644 --- a/internal/guest/bridge/bridge_v2.go +++ b/internal/guest/bridge/bridge_v2.go @@ -123,7 +123,7 @@ func (b *Bridge) createContainerV2(r *Request) (_ RequestResponse, err error) { Result: 0, ResultInfo: "", } - b.PublishNotification(notification) + b.publishNotification(notification) }() return &prot.ContainerCreateResponse{}, nil diff --git a/internal/guest/prot/protocol.go b/internal/guest/prot/protocol.go index 16e1f9daa3..ef88bd278e 100644 --- a/internal/guest/prot/protocol.go +++ b/internal/guest/prot/protocol.go @@ -39,6 +39,11 @@ const ( messageVersionShift = 0 ) +const ( + // LinuxGcsVsockPort is the vsock port the GCS connects to on the host. + LinuxGcsVsockPort uint32 = 0x40000000 +) + // MessageType is the type of the message. type MessageType uint32