diff --git a/cmd/nerdctl/container/container_logs_test.go b/cmd/nerdctl/container/container_logs_test.go index 449e780345f..ea2be65d6aa 100644 --- a/cmd/nerdctl/container/container_logs_test.go +++ b/cmd/nerdctl/container/container_logs_test.go @@ -164,6 +164,55 @@ func TestLogsWithFailingContainer(t *testing.T) { base.Cmd("rm", "-f", containerName).AssertOK() } +func TestLogsWithRunningContainer(t *testing.T) { + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", "-f", containerName).Run() + expected := make([]string, 10) + for i := 0; i < 10; i++ { + expected[i] = fmt.Sprint(i + 1) + } + + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "sh", "-euc", "for i in `seq 1 10`; do echo $i; sleep 1; done").AssertOK() + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) +} + +func TestLogsWithoutNewlineOrEOF(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("FIXME: test does not work on Windows yet because containerd doesn't send an exit event appropriately after task exit on Windows") + } + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", "-f", containerName).Run() + expected := []string{"Hello World!", "There is no newline"} + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "printf", "'Hello World!\nThere is no newline'").AssertOK() + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) +} + +func TestLogsAfterRestartingContainer(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("FIXME: test does not work on Windows yet. Restarting a container fails with: failed to create shim task: hcs::CreateComputeSystem : The requested operation for attach namespace failed.: unknown") + } + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", "-f", containerName).Run() + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "printf", "'Hello World!\nThere is no newline'").AssertOK() + expected := []string{"Hello World!", "There is no newline"} + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + // restart and check logs again + base.Cmd("start", containerName) + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) +} + func TestLogsWithForegroundContainers(t *testing.T) { testCase := nerdtest.Setup() // dual logging is not supported on Windows diff --git a/pkg/cmd/container/logs.go b/pkg/cmd/container/logs.go index dd523ed1774..9bf30018758 100644 --- a/pkg/cmd/container/logs.go +++ b/pkg/cmd/container/logs.go @@ -91,6 +91,10 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti // Setup goroutine to send stop event if container task finishes: go func() { <-waitCh + // Wait for logger to process remaining logs after container exit + if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { + log.G(ctx).WithError(err).Error("failed to wait for logger shutdown") + } log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer") stopChannel <- os.Interrupt }() diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 15663acc868..4966ad21254 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -30,12 +30,14 @@ import ( "sync" "time" + containerd "github.com/containerd/containerd/v2/client" "github.com/fsnotify/fsnotify" "github.com/muesli/cancelreader" "github.com/containerd/containerd/v2/core/runtime/v2/logging" "github.com/containerd/errdefs" "github.com/containerd/log" + "github.com/containerd/nerdctl/v2/pkg/lockutil" ) const ( @@ -149,7 +151,60 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) { return logConfig, nil } -func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, config *logging.Config) error { +func getLockPath(dataStore, ns, id string) string { + return filepath.Join(dataStore, "containers", ns, id, "logger-lock") +} + +// WaitForLogger waits until the logger has finished executing and processing container logs +func WaitForLogger(dataStore, ns, id string) error { + return lockutil.WithDirLock(getLockPath(dataStore, ns, id), func() error { + return nil + }) +} + +func getContainerWait(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) { + client, err := containerd.New(address, containerd.WithDefaultNamespace(config.Namespace)) + if err != nil { + return nil, err + } + con, err := client.LoadContainer(ctx, config.ID) + if err != nil { + return nil, err + } + + task, err := con.Task(ctx, nil) + if err == nil { + return task.Wait(ctx) + } + if !errdefs.IsNotFound(err) { + return nil, err + } + + // If task was not found, it's possible that the container runtime is still being created. + // Retry every 100ms. + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil, errors.New("timed out waiting for container task to start") + case <-ticker.C: + task, err = con.Task(ctx, nil) + if err != nil { + if errdefs.IsNotFound(err) { + continue + } + return nil, err + } + return task.Wait(ctx) + } + } +} + +type ContainerWaitFunc func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) + +func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, address string, getContainerWait ContainerWaitFunc, config *logging.Config) error { if err := driver.PreProcess(ctx, dataStore, config); err != nil { return err } @@ -168,6 +223,20 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, stderrR.Cancel() }() + // initialize goroutines to copy stdout and stderr streams to a closable pipe + pipeStdoutR, pipeStdoutW := io.Pipe() + pipeStderrR, pipeStderrW := io.Pipe() + copyStream := func(reader io.Reader, writer *io.PipeWriter) { + // copy using a buffer of size 32K + buf := make([]byte, 32<<10) + _, err := io.CopyBuffer(writer, reader, buf) + if err != nil { + log.G(ctx).Errorf("failed to copy stream: %s", err) + } + } + go copyStream(stdoutR, pipeStdoutW) + go copyStream(stderrR, pipeStderrW) + var wg sync.WaitGroup wg.Add(3) stdout := make(chan string, 10000) @@ -182,7 +251,6 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, for err == nil { var s string s, err = r.ReadString('\n') - if len(s) > 0 { dataChan <- strings.TrimSuffix(s, "\n") } @@ -192,12 +260,24 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, } } } - go processLogFunc(stdoutR, stdout) - go processLogFunc(stderrR, stderr) + go processLogFunc(pipeStdoutR, stdout) + go processLogFunc(pipeStderrR, stderr) go func() { defer wg.Done() driver.Process(stdout, stderr) }() + go func() { + // close pipeStdoutW and pipeStderrW upon container exit + defer pipeStdoutW.Close() + defer pipeStderrW.Close() + + exitCh, err := getContainerWait(ctx, address, config) + if err != nil { + log.G(ctx).Errorf("failed to get container task wait channel: %v", err) + return + } + <-exitCh + }() wg.Wait() return driver.PostProcess() } @@ -220,11 +300,24 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { if err != nil { return err } - if err := ready(); err != nil { + + loggerLock := getLockPath(dataStore, config.Namespace, config.ID) + f, err := os.Create(loggerLock) + if err != nil { return err } - - return loggingProcessAdapter(ctx, driver, dataStore, config) + defer f.Close() + + // the logger will obtain an exclusive lock on a file until the container is + // stopped and the driver has finished processing all output, + // so that waiting log viewers can be signalled when the process is complete. + return lockutil.WithDirLock(loggerLock, func() error { + if err := ready(); err != nil { + return err + } + // getContainerWait is extracted as parameter to allow mocking in tests. + return loggingProcessAdapter(ctx, driver, dataStore, logConfig.Address, getContainerWait, config) + }) } else if !errors.Is(err, os.ErrNotExist) { // the file does not exist if the container was created with nerdctl < 0.20 return err diff --git a/pkg/logging/logging_test.go b/pkg/logging/logging_test.go index 175f1b3e64b..da0d535b074 100644 --- a/pkg/logging/logging_test.go +++ b/pkg/logging/logging_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/runtime/v2/logging" ) @@ -78,7 +79,14 @@ func TestLoggingProcessAdapter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := loggingProcessAdapter(ctx, driver, "testDataStore", config) + var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) { + exitChan := make(chan containerd.ExitStatus, 1) + time.Sleep(50 * time.Millisecond) + exitChan <- containerd.ExitStatus{} + return exitChan, nil + } + + err := loggingProcessAdapter(ctx, driver, "testDataStore", "", getContainerWaitMock, config) if err != nil { t.Fatal(err) }