From 27b91ecf7517845a1bf01b26afe438f04cdf4840 Mon Sep 17 00:00:00 2001 From: fahed dorgaa Date: Thu, 6 Mar 2025 15:08:27 +0100 Subject: [PATCH] ensure logger completion after container exit Introduced mechanisms to ensure logger completion after container exit using file locks and added robust handling for log streams. Updated and added tests to validate functionality, including log behavior with running, non-terminated, and restarted containers. Signed-off-by: fahed dorgaa --- cmd/nerdctl/container/container_logs_test.go | 49 +++++++++ pkg/cmd/container/logs.go | 4 + pkg/logging/logging.go | 107 +++++++++++++++++-- pkg/logging/logging_test.go | 10 +- 4 files changed, 162 insertions(+), 8 deletions(-) diff --git a/cmd/nerdctl/container/container_logs_test.go b/cmd/nerdctl/container/container_logs_test.go index 449e780345f..ea2be65d6aa 100644 --- a/cmd/nerdctl/container/container_logs_test.go +++ b/cmd/nerdctl/container/container_logs_test.go @@ -164,6 +164,55 @@ func TestLogsWithFailingContainer(t *testing.T) { base.Cmd("rm", "-f", containerName).AssertOK() } +func TestLogsWithRunningContainer(t *testing.T) { + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", "-f", containerName).Run() + expected := make([]string, 10) + for i := 0; i < 10; i++ { + expected[i] = fmt.Sprint(i + 1) + } + + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "sh", "-euc", "for i in `seq 1 10`; do echo $i; sleep 1; done").AssertOK() + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) +} + +func TestLogsWithoutNewlineOrEOF(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("FIXME: test does not work on Windows yet because containerd doesn't send an exit event appropriately after task exit on Windows") + } + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", "-f", containerName).Run() + expected := []string{"Hello World!", "There is no newline"} + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "printf", "'Hello World!\nThere is no newline'").AssertOK() + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) +} + +func TestLogsAfterRestartingContainer(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("FIXME: test does not work on Windows yet. Restarting a container fails with: failed to create shim task: hcs::CreateComputeSystem : The requested operation for attach namespace failed.: unknown") + } + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", "-f", containerName).Run() + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "printf", "'Hello World!\nThere is no newline'").AssertOK() + expected := []string{"Hello World!", "There is no newline"} + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + // restart and check logs again + base.Cmd("start", containerName) + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) +} + func TestLogsWithForegroundContainers(t *testing.T) { testCase := nerdtest.Setup() // dual logging is not supported on Windows diff --git a/pkg/cmd/container/logs.go b/pkg/cmd/container/logs.go index dd523ed1774..9bf30018758 100644 --- a/pkg/cmd/container/logs.go +++ b/pkg/cmd/container/logs.go @@ -91,6 +91,10 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti // Setup goroutine to send stop event if container task finishes: go func() { <-waitCh + // Wait for logger to process remaining logs after container exit + if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { + log.G(ctx).WithError(err).Error("failed to wait for logger shutdown") + } log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer") stopChannel <- os.Interrupt }() diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 15663acc868..4966ad21254 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -30,12 +30,14 @@ import ( "sync" "time" + containerd "github.com/containerd/containerd/v2/client" "github.com/fsnotify/fsnotify" "github.com/muesli/cancelreader" "github.com/containerd/containerd/v2/core/runtime/v2/logging" "github.com/containerd/errdefs" "github.com/containerd/log" + "github.com/containerd/nerdctl/v2/pkg/lockutil" ) const ( @@ -149,7 +151,60 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) { return logConfig, nil } -func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, config *logging.Config) error { +func getLockPath(dataStore, ns, id string) string { + return filepath.Join(dataStore, "containers", ns, id, "logger-lock") +} + +// WaitForLogger waits until the logger has finished executing and processing container logs +func WaitForLogger(dataStore, ns, id string) error { + return lockutil.WithDirLock(getLockPath(dataStore, ns, id), func() error { + return nil + }) +} + +func getContainerWait(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) { + client, err := containerd.New(address, containerd.WithDefaultNamespace(config.Namespace)) + if err != nil { + return nil, err + } + con, err := client.LoadContainer(ctx, config.ID) + if err != nil { + return nil, err + } + + task, err := con.Task(ctx, nil) + if err == nil { + return task.Wait(ctx) + } + if !errdefs.IsNotFound(err) { + return nil, err + } + + // If task was not found, it's possible that the container runtime is still being created. + // Retry every 100ms. + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil, errors.New("timed out waiting for container task to start") + case <-ticker.C: + task, err = con.Task(ctx, nil) + if err != nil { + if errdefs.IsNotFound(err) { + continue + } + return nil, err + } + return task.Wait(ctx) + } + } +} + +type ContainerWaitFunc func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) + +func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, address string, getContainerWait ContainerWaitFunc, config *logging.Config) error { if err := driver.PreProcess(ctx, dataStore, config); err != nil { return err } @@ -168,6 +223,20 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, stderrR.Cancel() }() + // initialize goroutines to copy stdout and stderr streams to a closable pipe + pipeStdoutR, pipeStdoutW := io.Pipe() + pipeStderrR, pipeStderrW := io.Pipe() + copyStream := func(reader io.Reader, writer *io.PipeWriter) { + // copy using a buffer of size 32K + buf := make([]byte, 32<<10) + _, err := io.CopyBuffer(writer, reader, buf) + if err != nil { + log.G(ctx).Errorf("failed to copy stream: %s", err) + } + } + go copyStream(stdoutR, pipeStdoutW) + go copyStream(stderrR, pipeStderrW) + var wg sync.WaitGroup wg.Add(3) stdout := make(chan string, 10000) @@ -182,7 +251,6 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, for err == nil { var s string s, err = r.ReadString('\n') - if len(s) > 0 { dataChan <- strings.TrimSuffix(s, "\n") } @@ -192,12 +260,24 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, } } } - go processLogFunc(stdoutR, stdout) - go processLogFunc(stderrR, stderr) + go processLogFunc(pipeStdoutR, stdout) + go processLogFunc(pipeStderrR, stderr) go func() { defer wg.Done() driver.Process(stdout, stderr) }() + go func() { + // close pipeStdoutW and pipeStderrW upon container exit + defer pipeStdoutW.Close() + defer pipeStderrW.Close() + + exitCh, err := getContainerWait(ctx, address, config) + if err != nil { + log.G(ctx).Errorf("failed to get container task wait channel: %v", err) + return + } + <-exitCh + }() wg.Wait() return driver.PostProcess() } @@ -220,11 +300,24 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { if err != nil { return err } - if err := ready(); err != nil { + + loggerLock := getLockPath(dataStore, config.Namespace, config.ID) + f, err := os.Create(loggerLock) + if err != nil { return err } - - return loggingProcessAdapter(ctx, driver, dataStore, config) + defer f.Close() + + // the logger will obtain an exclusive lock on a file until the container is + // stopped and the driver has finished processing all output, + // so that waiting log viewers can be signalled when the process is complete. + return lockutil.WithDirLock(loggerLock, func() error { + if err := ready(); err != nil { + return err + } + // getContainerWait is extracted as parameter to allow mocking in tests. + return loggingProcessAdapter(ctx, driver, dataStore, logConfig.Address, getContainerWait, config) + }) } else if !errors.Is(err, os.ErrNotExist) { // the file does not exist if the container was created with nerdctl < 0.20 return err diff --git a/pkg/logging/logging_test.go b/pkg/logging/logging_test.go index 175f1b3e64b..da0d535b074 100644 --- a/pkg/logging/logging_test.go +++ b/pkg/logging/logging_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/runtime/v2/logging" ) @@ -78,7 +79,14 @@ func TestLoggingProcessAdapter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := loggingProcessAdapter(ctx, driver, "testDataStore", config) + var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) { + exitChan := make(chan containerd.ExitStatus, 1) + time.Sleep(50 * time.Millisecond) + exitChan <- containerd.ExitStatus{} + return exitChan, nil + } + + err := loggingProcessAdapter(ctx, driver, "testDataStore", "", getContainerWaitMock, config) if err != nil { t.Fatal(err) }