Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 43 additions & 28 deletions cmd/gcs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/Microsoft/hcsshim/internal/guest/bridge"
"github.com/Microsoft/hcsshim/internal/guest/cgroup"
"github.com/Microsoft/hcsshim/internal/guest/kmsg"
"github.com/Microsoft/hcsshim/internal/guest/prot"
"github.com/Microsoft/hcsshim/internal/guest/runtime/hcsv2"
"github.com/Microsoft/hcsshim/internal/guest/runtime/runc"
"github.com/Microsoft/hcsshim/internal/guest/transport"
Expand Down Expand Up @@ -385,35 +386,17 @@ func main() {
if err != nil {
logrus.WithError(err).Fatal("failed to initialize new runc runtime")
}
mux := bridge.NewBridgeMux()
b := bridge.Bridge{
Handler: mux,
EnableV4: *v4,
}

h := hcsv2.NewHost(rtime, tport, initialEnforcer, logWriter)
// Initialize virtual pod support in the host
if err := h.InitializeVirtualPodSupport(virtualPodsControl); err != nil {
logrus.WithError(err).Warn("Virtual pod support initialization failed")
}
b.AssignHandlers(mux, h)

var bridgeIn io.ReadCloser
var bridgeOut io.WriteCloser
if *useInOutErr {
bridgeIn = os.Stdin
bridgeOut = os.Stdout
} else {
const commandPort uint32 = 0x40000000
bridgeCon, err := tport.Dial(commandPort)
if err != nil {
logrus.WithFields(logrus.Fields{
"port": commandPort,
logrus.ErrorKey: err,
}).Fatal("failed to dial host vsock connection")
}
bridgeIn = bridgeCon
bridgeOut = bridgeCon
}
// During live migration the VM is frozen and only wakes up when the host
// shim is ready, so the vsock port should be immediately available. We
// use a tight retry interval instead of exponential backoff.
const reconnectInterval = 100 * time.Millisecond
Comment thread
rawahars marked this conversation as resolved.

event := cgroups1.MemoryThresholdEvent(*gcsMemLimitBytes, false)
gefd, err := gcsControl.RegisterMemoryEvent(event)
Expand Down Expand Up @@ -448,10 +431,42 @@ func main() {
go readMemoryEvents(startTime, gefdFile, "/gcs", int64(*gcsMemLimitBytes), gcsControl)
go readMemoryEvents(startTime, oomFile, "/containers", containersLimit, containersControl)
go readMemoryEvents(startTime, virtualPodsOomFile, "/containers/virtual-pods", containersLimit, virtualPodsControl)
err = b.ListenAndServe(bridgeIn, bridgeOut)
if err != nil {
logrus.WithFields(logrus.Fields{
logrus.ErrorKey: err,
}).Fatal("failed to serve gcs service")

mux := bridge.NewBridgeMux()
Comment thread
rawahars marked this conversation as resolved.
b := bridge.New(mux, *v4)
b.AssignHandlers(mux, h)

// Reconnect loop: dial the host, serve until the connection drops, then
// re-dial. During live migration the VM is frozen and only wakes up when
// the destination host shim is ready, so the vsock port should be
// immediately available.
for {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jterry75 I wonder if we should retry indefinitely or should we enforce a limit, say 12000 iterations (20 minutes). In such a case, if the shim died but VM is running, the VM can self-terminate after 20 mins of inactivity.

While that is fragile, I feel that 20 mins of inactivity after blackout is critical failure path anyways.

What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question for Justin. My instinct is to retry forever — if the shim died but the VM is still running, the GCS should keep trying until either the host comes back or the VM is torn down externally. A 20-minute timeout would leave the VM in a zombie state where it's running but can't be managed. That said, happy to add a configurable limit if Justin thinks there's a scenario where self-termination is better.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it's not zombie VM. IIRC if the gcs crashed, the VM will also exit as it's init process exited.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Host controls the lifetime of these. Even for a zombie VM host would be responsible for cleaning up. So we should just loop forever. If there is no connection, and nobody comes and terminates this VM that's other bugs to deal with. But that shouldn't happen ever since we use the terminate on last handle closed.

var bridgeIn io.ReadCloser
var bridgeOut io.WriteCloser
if *useInOutErr {
bridgeIn = os.Stdin
bridgeOut = os.Stdout
} else {
bridgeCon, dialErr := tport.Dial(prot.LinuxGcsVsockPort)
if dialErr != nil {
logrus.WithError(dialErr).Warn("failed to dial host, retrying")
time.Sleep(reconnectInterval)
continue
}
bridgeIn = bridgeCon
bridgeOut = bridgeCon
}

logrus.Info("bridge connected, serving")

serveErr := b.ListenAndServe(bridgeIn, bridgeOut)
Comment thread
jterry75 marked this conversation as resolved.

if b.ShutdownRequested() {
logrus.Info("bridge shutdown requested, exiting reconnect loop")
break
}

logrus.WithError(serveErr).Warn("bridge connection lost, will reconnect")
time.Sleep(reconnectInterval)
}
}
82 changes: 77 additions & 5 deletions internal/guest/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,71 @@ type Bridge struct {
hasQuitPending atomic.Bool

protVer prot.ProtocolVersion

// notifyMu guards connected and pendingNotifications.
notifyMu sync.Mutex
// connected is true while ListenAndServe is running and responseChan
// is valid. When false, notifications are queued in pendingNotifications.
connected bool
// pendingNotifications holds container exit notifications that arrived
// while the bridge was disconnected. They are drained when ListenAndServe
// reconnects.
pendingNotifications []*prot.ContainerNotification
}

// New creates a Bridge. Container exit notifications are queued while
// disconnected and drained on reconnect.
func New(handler Handler, enableV4 bool) *Bridge {
return &Bridge{
Handler: handler,
EnableV4: enableV4,
}
}

// publishNotification routes a container exit notification to the bridge.
// If the bridge is disconnected, the notification is queued and will be
// drained when ListenAndServe reconnects.
func (b *Bridge) publishNotification(n *prot.ContainerNotification) {
b.notifyMu.Lock()
if !b.connected {
logrus.WithField("containerID", n.ContainerID).
Warn("bridge not connected, queueing container notification")
b.pendingNotifications = append(b.pendingNotifications, n)
b.notifyMu.Unlock()
return
}
b.notifyMu.Unlock()
b.PublishNotification(n)
}

// drainPendingNotifications sends any queued notifications through the bridge.
// Must be called after responseChan is valid and the response writer goroutine
// is running.
func (b *Bridge) drainPendingNotifications() {
b.notifyMu.Lock()
b.connected = true
drain := b.pendingNotifications
b.pendingNotifications = nil
b.notifyMu.Unlock()

for _, n := range drain {
logrus.WithField("containerID", n.ContainerID).
Info("draining queued container notification")
b.PublishNotification(n)
}
}

// disconnectNotifications marks the bridge as disconnected so future
// publishNotification calls queue instead of sending.
func (b *Bridge) disconnectNotifications() {
b.notifyMu.Lock()
b.connected = false
b.notifyMu.Unlock()
}

// ShutdownRequested returns true if the bridge has been asked to shut down.
func (b *Bridge) ShutdownRequested() bool {
return b.hasQuitPending.Load()
}

// AssignHandlers creates and assigns the appropriate bridge
Expand Down Expand Up @@ -230,13 +295,17 @@ func (b *Bridge) ListenAndServe(bridgeIn io.ReadCloser, bridgeOut io.WriteCloser
responseErrChan := make(chan error)
b.quitChan = make(chan bool)

defer close(b.quitChan)
// Defers execute in LIFO order. We close quitChan first to stop the
// read loop, then disconnect notifications (so any late publishNotification
// calls queue instead of sending to a closing responseChan), then tear
// down the remaining channels and connections.
defer bridgeIn.Close()
defer close(requestErrChan)
defer close(requestChan)
defer bridgeOut.Close()
defer close(responseErrChan)
defer close(b.responseChan)
defer close(requestChan)
defer close(requestErrChan)
defer bridgeIn.Close()
defer b.disconnectNotifications()
defer close(b.quitChan)

// Receive bridge requests and schedule them to be processed.
go func() {
Expand Down Expand Up @@ -396,6 +465,9 @@ func (b *Bridge) ListenAndServe(bridgeIn io.ReadCloser, bridgeOut io.WriteCloser
responseErrChan <- resperr
}()

// Drain queued notifications now that the response writer is running.
b.drainPendingNotifications()

select {
case err := <-requestErrChan:
return err
Expand Down
165 changes: 165 additions & 0 deletions internal/guest/bridge/bridge_reconnect_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
//go:build linux

package bridge

import (
"io"
"testing"
"time"

"github.com/Microsoft/hcsshim/internal/guest/prot"
)

func TestBridge_NotificationQueuedWhenDisconnected(t *testing.T) {
b := New(nil, false)
// Bridge starts disconnected (connected == false).
b.publishNotification(&prot.ContainerNotification{
MessageBase: prot.MessageBase{ContainerID: "c1"},
})
b.publishNotification(&prot.ContainerNotification{
MessageBase: prot.MessageBase{ContainerID: "c2"},
})

b.notifyMu.Lock()
if len(b.pendingNotifications) != 2 {
t.Fatalf("expected 2 queued, got %d", len(b.pendingNotifications))
}
b.notifyMu.Unlock()
}

func TestBridge_DrainOnReconnect(t *testing.T) {
b := New(nil, false)

// Queue notifications while disconnected.
b.publishNotification(&prot.ContainerNotification{
MessageBase: prot.MessageBase{ContainerID: "c1"},
})
b.publishNotification(&prot.ContainerNotification{
MessageBase: prot.MessageBase{ContainerID: "c2"},
})

// Simulate what ListenAndServe does: create channels, start writer,
// then drain.
b.responseChan = make(chan bridgeResponse, 4)

b.drainPendingNotifications()

// Collect drained notifications.
var ids []string
for i := 0; i < 2; i++ {
select {
case resp := <-b.responseChan:
n := resp.response.(*prot.ContainerNotification)
ids = append(ids, n.ContainerID)
case <-time.After(time.Second):
t.Fatalf("timed out waiting for notification %d", i)
}
}
if len(ids) != 2 {
t.Fatalf("expected 2 drained notifications, got %d", len(ids))
}

b.notifyMu.Lock()
if len(b.pendingNotifications) != 0 {
t.Fatalf("expected 0 pending after drain, got %d", len(b.pendingNotifications))
}
b.notifyMu.Unlock()
}

func TestBridge_DisconnectQueuesAfterDrain(t *testing.T) {
b := New(nil, false)
b.responseChan = make(chan bridgeResponse, 4)

// Drain with nothing pending — just sets connected = true.
b.drainPendingNotifications()

// Send while connected — goes directly to responseChan.
b.publishNotification(&prot.ContainerNotification{
MessageBase: prot.MessageBase{ContainerID: "live"},
})

select {
case resp := <-b.responseChan:
n := resp.response.(*prot.ContainerNotification)
if n.ContainerID != "live" {
t.Fatalf("expected 'live', got %q", n.ContainerID)
}
default:
t.Fatal("expected notification on responseChan")
}

// Disconnect — future notifications should queue.
b.disconnectNotifications()

b.publishNotification(&prot.ContainerNotification{
MessageBase: prot.MessageBase{ContainerID: "queued"},
})

b.notifyMu.Lock()
if len(b.pendingNotifications) != 1 {
t.Fatalf("expected 1 queued after disconnect, got %d", len(b.pendingNotifications))
}
b.notifyMu.Unlock()

// Nothing should be on responseChan.
select {
case <-b.responseChan:
t.Fatal("should not have received on responseChan after disconnect")
default:
}
}

func TestBridge_FullReconnectCycle(t *testing.T) {
b := New(nil, false)

// --- Iteration 1: simulate ListenAndServe ---
r1, w1 := io.Pipe()
b.responseChan = make(chan bridgeResponse, 4)
b.quitChan = make(chan bool)

go func() {
for range b.responseChan {
}
}() // drain writer

b.drainPendingNotifications()

// Send a notification while connected.
b.publishNotification(&prot.ContainerNotification{
MessageBase: prot.MessageBase{ContainerID: "iter1"},
})

// Simulate bridge drop — disconnect, close channels.
b.disconnectNotifications()
close(b.quitChan)
close(b.responseChan)
r1.Close()
w1.Close()

// --- Between iterations: container exits ---
b.publishNotification(&prot.ContainerNotification{
MessageBase: prot.MessageBase{ContainerID: "between"},
})

b.notifyMu.Lock()
if len(b.pendingNotifications) != 1 || b.pendingNotifications[0].ContainerID != "between" {
t.Fatalf("expected 'between' queued, got %v", b.pendingNotifications)
}
b.notifyMu.Unlock()

// --- Iteration 2: reconnect ---
b.responseChan = make(chan bridgeResponse, 4)
b.quitChan = make(chan bool)

b.drainPendingNotifications()

select {
case resp := <-b.responseChan:
n := resp.response.(*prot.ContainerNotification)
if n.ContainerID != "between" {
t.Fatalf("expected 'between', got %q", n.ContainerID)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for drained notification")
}
}
2 changes: 1 addition & 1 deletion internal/guest/bridge/bridge_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (b *Bridge) createContainerV2(r *Request) (_ RequestResponse, err error) {
Result: 0,
ResultInfo: "",
}
b.PublishNotification(notification)
b.publishNotification(notification)
}()

return &prot.ContainerCreateResponse{}, nil
Expand Down
5 changes: 5 additions & 0 deletions internal/guest/prot/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const (
messageVersionShift = 0
)

const (
// LinuxGcsVsockPort is the vsock port the GCS connects to on the host.
LinuxGcsVsockPort uint32 = 0x40000000
)

// MessageType is the type of the message.
type MessageType uint32

Expand Down
Loading