diff --git a/cmd/containerd-shim-runhcs-v1/exec_hcs.go b/cmd/containerd-shim-runhcs-v1/exec_hcs.go index fe15f321e3..94a50bae8d 100644 --- a/cmd/containerd-shim-runhcs-v1/exec_hcs.go +++ b/cmd/containerd-shim-runhcs-v1/exec_hcs.go @@ -613,16 +613,22 @@ func (he *hcsExec) waitForExit() { he.ioWg.Wait() he.io.Close() - // We had a valid process so send the exited notification. - he.events( - runtime.TaskExitEventTopic, - &eventstypes.TaskExit{ - ContainerID: he.tid, - ID: he.id, - Pid: uint32(he.pid), - ExitStatus: he.exitStatus, - ExitedAt: he.exitedAt, - }) + // Only send the `runtime.TaskExitEventTopic` notification if this is a true + // exec. For the `init` exec this is handled in task teardown. + if he.tid != he.id { + // We had a valid process so send the exited notification. + he.events( + runtime.TaskExitEventTopic, + &eventstypes.TaskExit{ + ContainerID: he.tid, + ID: he.id, + Pid: uint32(he.pid), + ExitStatus: he.exitStatus, + ExitedAt: he.exitedAt, + }) + } + + // Free any waiters. he.exitedOnce.Do(func() { close(he.exited) }) diff --git a/cmd/containerd-shim-runhcs-v1/exec_wcow_podsandbox.go b/cmd/containerd-shim-runhcs-v1/exec_wcow_podsandbox.go index 65d17784ea..5797c99672 100644 --- a/cmd/containerd-shim-runhcs-v1/exec_wcow_podsandbox.go +++ b/cmd/containerd-shim-runhcs-v1/exec_wcow_podsandbox.go @@ -166,15 +166,10 @@ func (wpse *wcowPodSandboxExec) Kill(ctx context.Context, signal uint32) error { wpse.state = shimExecStateExited wpse.exitStatus = 0 wpse.exitedAt = time.Now() - wpse.events( - runtime.TaskExitEventTopic, - &eventstypes.TaskExit{ - ContainerID: wpse.tid, - ID: wpse.tid, // The init exec ID is always the same as Task ID. - Pid: uint32(wpse.pid), - ExitStatus: wpse.exitStatus, - ExitedAt: wpse.exitedAt, - }) + + // NOTE: We do not support a non `init` exec for this "fake" init + // process. Skip any exited event which will be sent by the task. + close(wpse.exited) return nil case shimExecStateExited: @@ -233,22 +228,13 @@ func (wpse *wcowPodSandboxExec) ForceExit(status int) { "status": status, }).Debug("hcsExec::ForceExit") - wasRunning := wpse.state == shimExecStateRunning wpse.state = shimExecStateExited wpse.exitStatus = 1 wpse.exitedAt = time.Now() - if wasRunning { - wpse.events( - runtime.TaskExitEventTopic, - &eventstypes.TaskExit{ - ContainerID: wpse.tid, - ID: wpse.tid, // The init exec ID is always the same as Task ID. - Pid: uint32(wpse.pid), - ExitStatus: wpse.exitStatus, - ExitedAt: wpse.exitedAt, - }) - } + // NOTE: We do not support a non `init` exec for this "fake" init + // process. Skip any exited event which will be sent by the task. + close(wpse.exited) } } diff --git a/cmd/containerd-shim-runhcs-v1/task_hcs.go b/cmd/containerd-shim-runhcs-v1/task_hcs.go index 74d4400738..bb02ea4cf3 100644 --- a/cmd/containerd-shim-runhcs-v1/task_hcs.go +++ b/cmd/containerd-shim-runhcs-v1/task_hcs.go @@ -161,9 +161,13 @@ func newHcsTask( io) if parent != nil { + // We have a parent UVM. Listen for its exit and forcibly close this + // task. This is not expected but in the event of a UVM crash we need to + // handle this case. go ht.waitForHostExit() } - + // In the normal case the `Signal` call from the caller killed this task's + // init process. go func() { // Wait for our init process to exit. ht.init.Wait(context.Background()) @@ -468,14 +472,7 @@ func (ht *hcsTask) waitForHostExit() { return false }) ht.init.ForceExit(1) - ht.closeHostOnce.Do(func() { - if err := ht.host.Close(); err != nil { - logrus.WithFields(logrus.Fields{ - "tid": ht.id, - logrus.ErrorKey: err, - }).Error("hcsTask::close - failed host vm shutdown") - } - }) + ht.closeHost() } // close shuts down the container that is owned by this task and if @@ -544,18 +541,37 @@ func (ht *hcsTask) close() { }).Error("hcsTask::close - failed to close container") } } + ht.closeHost() + }) +} +// closeHost safely closes the hosting UVM if this task is the owner. Once +// closed and all resources released it events the `runtime.TaskExitEventTopic` +// for all upstream listeners. +// +// Note: If this is a process isolated task the hosting UVM is simply a `noop`. +// +// This call is idempotent and safe to call multiple times. +func (ht *hcsTask) closeHost() { + ht.closeHostOnce.Do(func() { if ht.ownsHost && ht.host != nil { - // This task is also the host owner. Shutdown the host as part of - // the init process going down. - ht.closeHostOnce.Do(func() { - if err := ht.host.Close(); err != nil { - logrus.WithFields(logrus.Fields{ - "tid": ht.id, - logrus.ErrorKey: err, - }).Error("hcsTask::close - failed host vm shutdown") - } - }) + if err := ht.host.Close(); err != nil { + logrus.WithFields(logrus.Fields{ + "tid": ht.id, + logrus.ErrorKey: err, + }).Error("hcsTask::closeHost - failed host vm shutdown") + } } + // Send the `init` exec exit notification always. + exit := ht.init.Status() + ht.events( + runtime.TaskExitEventTopic, + &eventstypes.TaskExit{ + ContainerID: ht.id, + ID: exit.ID, + Pid: uint32(exit.Pid), + ExitStatus: exit.ExitStatus, + ExitedAt: exit.ExitedAt, + }) }) } diff --git a/cmd/containerd-shim-runhcs-v1/task_wcow_podsandbox.go b/cmd/containerd-shim-runhcs-v1/task_wcow_podsandbox.go index 9f72ebd83b..16f9e938b4 100644 --- a/cmd/containerd-shim-runhcs-v1/task_wcow_podsandbox.go +++ b/cmd/containerd-shim-runhcs-v1/task_wcow_podsandbox.go @@ -17,6 +17,14 @@ import ( "github.com/sirupsen/logrus" ) +// newWcowPodSandboxTask creates a fake WCOW task with a fake WCOW `init` +// process as a performance optimization rather than creating an actual +// container and process since it is not needed to hold open any namespaces like +// the equivalent on Linux. +// +// It is assumed that this is the only fake WCOW task and that this task owns +// `parent`. When the fake WCOW `init` process exits via `Signal` `parent` will +// be forcibly closed by this task. func newWcowPodSandboxTask(ctx context.Context, events publisher, id, bundle string, parent *uvm.UtilityVM) shimTask { logrus.WithFields(logrus.Fields{ "tid": id, @@ -28,6 +36,9 @@ func newWcowPodSandboxTask(ctx context.Context, events publisher, id, bundle str init: newWcowPodSandboxExec(ctx, events, id, bundle), } if parent != nil { + // We have (and own) a parent UVM. Listen for its exit and forcibly + // close this task. This is not expected but in the event of a UVM crash + // we need to handle this case. go func() { werr := parent.Wait() if werr != nil && !hcs.IsAlreadyClosed(werr) { @@ -40,9 +51,20 @@ func newWcowPodSandboxTask(ctx context.Context, events publisher, id, bundle str // already) to unblock any waiters since the platform wont send any // events for this fake process. wpst.init.ForceExit(1) - parent.Close() + + // Close the host and event the exit. + wpst.close() }() } + // In the normal case the `Signal` call from the caller killed this fake + // init process. + go func() { + // Wait for it to exit on its own + wpst.init.Wait(context.Background()) + + // Close the host and event the exit + wpst.close() + }() return wpst } @@ -118,14 +140,6 @@ func (wpst *wcowPodSandboxTask) KillExec(ctx context.Context, eid string, signal if err != nil { return err } - if eid == "" { - // We killed the fake init task. Bring down the uvm. - wpst.closeOnce.Do(func() { - if wpst.host != nil { - wpst.host.Close() - } - }) - } return nil } @@ -171,3 +185,31 @@ func (wpst *wcowPodSandboxTask) Pids(ctx context.Context) ([]options.ProcessDeta }, }, nil } + +// close safely closes the hosting UVM. Because of the specialty of this task it +// is assumed that this is always the owner of `wpst.host`. Once closed and all +// resources released it events the `runtime.TaskExitEventTopic` for all +// upstream listeners. +// +// This call is idempotent and safe to call multiple times. +func (wpst *wcowPodSandboxTask) close() { + wpst.closeOnce.Do(func() { + if err := wpst.host.Close(); !hcs.IsAlreadyClosed(err) { + logrus.WithFields(logrus.Fields{ + "tid": wpst.id, + logrus.ErrorKey: err, + }).Error("wcowPodSandboxTask::close - failed host vm shutdown") + } + // Send the `init` exec exit notification always. + exit := wpst.init.Status() + wpst.events( + runtime.TaskExitEventTopic, + &eventstypes.TaskExit{ + ContainerID: wpst.id, + ID: exit.ID, + Pid: uint32(exit.Pid), + ExitStatus: exit.ExitStatus, + ExitedAt: exit.ExitedAt, + }) + }) +} diff --git a/internal/hcs/process.go b/internal/hcs/process.go index 0a9c9199c9..93a7831f40 100644 --- a/internal/hcs/process.go +++ b/internal/hcs/process.go @@ -23,6 +23,9 @@ type Process struct { callbackNumber uintptr logctx logrus.Fields + + waitBlock chan struct{} + waitError error } func newProcess(process hcsProcess, processID int, computeSystem *System) *Process { @@ -34,6 +37,7 @@ func newProcess(process hcsProcess, processID int, computeSystem *System) *Proce logfields.ContainerID: computeSystem.ID(), logfields.ProcessID: processID, }, + waitBlock: make(chan struct{}), } } @@ -163,33 +167,47 @@ func (process *Process) Kill() (err error) { return nil } -// Wait waits for the process to exit. +// waitBackground waits for the process exit notification. Once received sets +// `process.waitError` (if any) and unblocks all `Wait` and `WaitTimeout` calls. +// +// This MUST be called exactly once per `process.handle` but `Wait` and +// `WaitTimeout` are safe to call multiple times. +func (process *Process) waitBackground() { + process.waitError = waitForNotification(process.callbackNumber, hcsNotificationProcessExited, nil) + close(process.waitBlock) +} + +// Wait waits for the process to exit. If the process has already exited returns +// the pervious error (if any). func (process *Process) Wait() (err error) { operation := "hcsshim::Process::Wait" process.logOperationBegin(operation) defer func() { process.logOperationEnd(operation, err) }() - err = waitForNotification(process.callbackNumber, hcsNotificationProcessExited, nil) - if err != nil { + <-process.waitBlock + if process.waitError != nil { return makeProcessError(process, operation, err, nil) } - return nil } -// WaitTimeout waits for the process to exit or the duration to elapse. It returns -// false if timeout occurs. +// WaitTimeout waits for the process to exit or the duration to elapse. If the +// process has already exited returns the pervious error (if any). If a timeout +// occurs returns `ErrTimeout`. func (process *Process) WaitTimeout(timeout time.Duration) (err error) { operation := "hcssshim::Process::WaitTimeout" process.logOperationBegin(operation) defer func() { process.logOperationEnd(operation, err) }() - err = waitForNotification(process.callbackNumber, hcsNotificationProcessExited, &timeout) - if err != nil { - return makeProcessError(process, operation, err, nil) + select { + case <-process.waitBlock: + if process.waitError != nil { + return makeProcessError(process, operation, process.waitError, nil) + } + return nil + case <-time.After(timeout): + return makeProcessError(process, operation, ErrTimeout, nil) } - - return nil } // ResizeConsole resizes the console of the process. diff --git a/internal/hcs/system.go b/internal/hcs/system.go index 8fa0c7b658..4af30ae40f 100644 --- a/internal/hcs/system.go +++ b/internal/hcs/system.go @@ -43,6 +43,9 @@ type System struct { callbackNumber uintptr logctx logrus.Fields + + waitBlock chan struct{} + waitError error } func newSystem(id string) *System { @@ -51,6 +54,7 @@ func newSystem(id string) *System { logctx: logrus.Fields{ logfields.ContainerID: id, }, + waitBlock: make(chan struct{}), } } @@ -121,6 +125,8 @@ func CreateComputeSystem(id string, hcsDocumentInterface interface{}) (_ *System return nil, makeSystemError(computeSystem, operation, hcsDocument, err, events) } + go computeSystem.waitBackground() + return computeSystem, nil } @@ -153,6 +159,7 @@ func OpenComputeSystem(id string) (_ *System, err error) { if err = computeSystem.registerCallback(); err != nil { return nil, makeSystemError(computeSystem, operation, "", err, nil) } + go computeSystem.waitBackground() return computeSystem, nil } @@ -335,48 +342,65 @@ func (computeSystem *System) Terminate() (err error) { return nil } -// Wait synchronously waits for the compute system to shutdown or terminate. +// waitBackground waits for the compute system exit notification. Once received +// sets `computeSystem.waitError` (if any) and unblocks all `Wait`, +// `WaitExpectedError`, and `WaitTimeout` calls. +// +// This MUST be called exactly once per `computeSystem.handle` but `Wait`, +// `WaitExpectedError`, and `WaitTimeout` are safe to call multiple times. +func (computeSystem *System) waitBackground() { + computeSystem.waitError = waitForNotification(computeSystem.callbackNumber, hcsNotificationSystemExited, nil) + close(computeSystem.waitBlock) +} + +// Wait synchronously waits for the compute system to shutdown or terminate. If +// the compute system has already exited returns the previous error (if any). func (computeSystem *System) Wait() (err error) { operation := "hcsshim::ComputeSystem::Wait" computeSystem.logOperationBegin(operation) defer func() { computeSystem.logOperationEnd(operation, err) }() - err = waitForNotification(computeSystem.callbackNumber, hcsNotificationSystemExited, nil) - if err != nil { - return makeSystemError(computeSystem, "Wait", "", err, nil) + <-computeSystem.waitBlock + if computeSystem.waitError != nil { + return makeSystemError(computeSystem, "Wait", "", computeSystem.waitError, nil) } return nil } // WaitExpectedError synchronously waits for the compute system to shutdown or -// terminate, and ignores the passed error if it occurs. +// terminate and returns the error (if any) as long as it does not match +// `expected`. If the compute system has already exited returns the previous +// error (if any) as long as it does not match `expected`. func (computeSystem *System) WaitExpectedError(expected error) (err error) { operation := "hcsshim::ComputeSystem::WaitExpectedError" computeSystem.logOperationBegin(operation) defer func() { computeSystem.logOperationEnd(operation, err) }() - err = waitForNotification(computeSystem.callbackNumber, hcsNotificationSystemExited, nil) - if err != nil && getInnerError(err) != expected { - return makeSystemError(computeSystem, "WaitExpectedError", "", err, nil) + <-computeSystem.waitBlock + if computeSystem.waitError != nil && getInnerError(computeSystem.waitError) != expected { + return makeSystemError(computeSystem, "WaitExpectedError", "", computeSystem.waitError, nil) } - return nil } -// WaitTimeout synchronously waits for the compute system to terminate or the duration to elapse. -// If the timeout expires, IsTimeout(err) == true +// WaitTimeout synchronously waits for the compute system to terminate or the +// duration to elapse. If the timeout expires, `IsTimeout(err) == true`. If +// the compute system has already exited returns the previous error (if any). func (computeSystem *System) WaitTimeout(timeout time.Duration) (err error) { operation := "hcsshim::ComputeSystem::WaitTimeout" computeSystem.logOperationBegin(operation) defer func() { computeSystem.logOperationEnd(operation, err) }() - err = waitForNotification(computeSystem.callbackNumber, hcsNotificationSystemExited, &timeout) - if err != nil { - return makeSystemError(computeSystem, "WaitTimeout", "", err, nil) + select { + case <-computeSystem.waitBlock: + if computeSystem.waitError != nil { + return makeSystemError(computeSystem, "WaitTimeout", "", computeSystem.waitError, nil) + } + return nil + case <-time.After(timeout): + return makeSystemError(computeSystem, "WaitTimeout", "", ErrTimeout, nil) } - - return nil } func (computeSystem *System) Properties(types ...schema1.PropertyType) (_ *schema1.ContainerProperties, err error) { @@ -519,6 +543,7 @@ func (computeSystem *System) CreateProcess(c interface{}) (_ *Process, err error if err = process.registerCallback(); err != nil { return nil, makeSystemError(computeSystem, "CreateProcess", "", err, nil) } + go process.waitBackground() return process, nil } @@ -557,6 +582,7 @@ func (computeSystem *System) OpenProcess(pid int) (_ *Process, err error) { if err = process.registerCallback(); err != nil { return nil, makeSystemError(computeSystem, "OpenProcess", "", err, nil) } + go process.waitBackground() return process, nil }