diff --git a/cmd/nerdctl/container_logs_test.go b/cmd/nerdctl/container_logs_test.go index 3f78ba8e2ce..a107982182c 100644 --- a/cmd/nerdctl/container_logs_test.go +++ b/cmd/nerdctl/container_logs_test.go @@ -18,6 +18,7 @@ package main import ( "fmt" + "runtime" "strings" "testing" "time" @@ -143,3 +144,55 @@ func TestLogsWithFailingContainer(t *testing.T) { base.Cmd("logs", "-f", containerName).AssertNoOut("baz") base.Cmd("rm", "-f", containerName).AssertOK() } + +func TestLogsWithRunningContainer(t *testing.T) { + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", containerName).Run() + expected := []string{} + for i := 1; i <= 10; i++ { + expected = append(expected, fmt.Sprint(i)) + } + + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "sh", "-euc", "for i in `seq 1 10`; do echo $i; sleep 1; done").AssertOK() + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + base.Cmd("rm", "-f", containerName).AssertOK() +} + +func TestLogsWithoutNewlineOrEOF(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("FIXME: test does not work on Windows yet because containerd doesn't send an exit event appropriately after task exit on Windows") + } + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", containerName).Run() + expected := []string{"Hello World!", "There is no newline"} + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "printf", "'Hello World!\nThere is no newline'").AssertOK() + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + base.Cmd("rm", "-f", containerName).AssertOK() +} + +func TestLogsAfterRestartingContainer(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("FIXME: test does not work on Windows yet. Restarting a container fails with: failed to create shim task: hcs::CreateComputeSystem : The requested operation for attach namespace failed.: unknown") + } + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", containerName).Run() + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "printf", "'Hello World!\nThere is no newline'").AssertOK() + expected := []string{"Hello World!", "There is no newline"} + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + // restart and check logs again + base.Cmd("start", containerName) + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + base.Cmd("rm", "-f", containerName).AssertOK() +} diff --git a/pkg/cmd/container/create.go b/pkg/cmd/container/create.go index c39d036c628..1c788720c56 100644 --- a/pkg/cmd/container/create.go +++ b/pkg/cmd/container/create.go @@ -161,7 +161,7 @@ func Create(ctx context.Context, client *containerd.Client, args []string, netMa // 1, nerdctl run --name demo -it imagename // 2, ctrl + c to stop demo container // 3, nerdctl start/restart demo - logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, options.GOptions.Namespace) + logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, options.GOptions.Namespace, options.GOptions.Address) if err != nil { return nil, nil, err } @@ -661,8 +661,9 @@ func writeCIDFile(path, id string) error { } // generateLogConfig creates a LogConfig for the current container store -func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, ns string) (logConfig logging.LogConfig, err error) { +func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, ns, hostAddress string) (logConfig logging.LogConfig, err error) { var u *url.URL + logConfig.HostAddress = hostAddress if u, err = url.Parse(logDriver); err == nil && u.Scheme != "" { logConfig.LogURI = logDriver } else { diff --git a/pkg/cmd/container/logs.go b/pkg/cmd/container/logs.go index 239074c36cd..e24454d0795 100644 --- a/pkg/cmd/container/logs.go +++ b/pkg/cmd/container/logs.go @@ -23,9 +23,10 @@ import ( "os/signal" "syscall" + "github.com/containerd/log" + "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" - "github.com/containerd/log" "github.com/containerd/nerdctl/pkg/api/types" "github.com/containerd/nerdctl/pkg/api/types/cri" "github.com/containerd/nerdctl/pkg/clientutil" @@ -90,7 +91,11 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti // Setup goroutine to send stop event if container task finishes: go func() { <-waitCh - log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer") + // Wait for logger to process remaining logs after container exit + if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { + log.L.WithError(err).Error("failed to wait for logger shutdown") + } + log.L.Debugf("container task has finished, sending kill signal to log viewer") stopChannel <- os.Interrupt }() } diff --git a/pkg/dnsutil/hostsstore/hostsstore.go b/pkg/dnsutil/hostsstore/hostsstore.go index 40a7726e66d..d865aae2ac3 100644 --- a/pkg/dnsutil/hostsstore/hostsstore.go +++ b/pkg/dnsutil/hostsstore/hostsstore.go @@ -73,7 +73,7 @@ func AllocHostsFile(dataStore, ns, id string) (string, error) { fn := func() error { return ensureFile(path) } - err := lockutil.WithDirLock(lockDir, fn) + err := lockutil.WithLock(lockDir, fn) return path, err } @@ -86,7 +86,7 @@ func DeallocHostsFile(dataStore, ns, id string) error { fn := func() error { return os.RemoveAll(dirToBeRemoved) } - return lockutil.WithDirLock(lockDir, fn) + return lockutil.WithLock(lockDir, fn) } func NewStore(dataStore string) (Store, error) { @@ -135,7 +135,7 @@ func (x *store) Acquire(meta Meta) error { } return newUpdater(meta.ID, x.hostsD).update() } - return lockutil.WithDirLock(x.hostsD, fn) + return lockutil.WithLock(x.hostsD, fn) } // Release is triggered by Poststop hooks. @@ -155,7 +155,7 @@ func (x *store) Release(ns, id string) error { } return newUpdater(id, x.hostsD).update() } - return lockutil.WithDirLock(x.hostsD, fn) + return lockutil.WithLock(x.hostsD, fn) } func (x *store) Update(ns, id, newName string) error { @@ -179,5 +179,5 @@ func (x *store) Update(ns, id, newName string) error { } return newUpdater(meta.ID, x.hostsD).update() } - return lockutil.WithDirLock(x.hostsD, fn) + return lockutil.WithLock(x.hostsD, fn) } diff --git a/pkg/lockutil/lockutil_unix.go b/pkg/lockutil/lockutil_unix.go index dbabcbc9b0f..01c9be133be 100644 --- a/pkg/lockutil/lockutil_unix.go +++ b/pkg/lockutil/lockutil_unix.go @@ -26,18 +26,18 @@ import ( "golang.org/x/sys/unix" ) -func WithDirLock(dir string, fn func() error) error { - dirFile, err := os.Open(dir) +func WithLock(name string, fn func() error) error { + dirFile, err := os.Open(name) if err != nil { return err } defer dirFile.Close() if err := Flock(dirFile, unix.LOCK_EX); err != nil { - return fmt.Errorf("failed to lock %q: %w", dir, err) + return fmt.Errorf("failed to lock %q: %w", name, err) } defer func() { if err := Flock(dirFile, unix.LOCK_UN); err != nil { - log.L.WithError(err).Errorf("failed to unlock %q", dir) + log.L.WithError(err).Errorf("failed to unlock %q", name) } }() return fn() diff --git a/pkg/lockutil/lockutil_windows.go b/pkg/lockutil/lockutil_windows.go index 85658167443..1a293ca8c0e 100644 --- a/pkg/lockutil/lockutil_windows.go +++ b/pkg/lockutil/lockutil_windows.go @@ -24,8 +24,8 @@ import ( "golang.org/x/sys/windows" ) -func WithDirLock(dir string, fn func() error) error { - dirFile, err := os.OpenFile(dir+".lock", os.O_CREATE, 0644) +func WithLock(name string, fn func() error) error { + dirFile, err := os.OpenFile(name+".lock", os.O_CREATE, 0644) if err != nil { return err } @@ -33,12 +33,12 @@ func WithDirLock(dir string, fn func() error) error { // see https://msdn.microsoft.com/en-us/library/windows/desktop/aa365203(v=vs.85).aspx // 1 lock immediately if err = windows.LockFileEx(windows.Handle(dirFile.Fd()), 1, 0, 1, 0, &windows.Overlapped{}); err != nil { - return fmt.Errorf("failed to lock %q: %w", dir, err) + return fmt.Errorf("failed to lock %q: %w", name, err) } defer func() { if err := windows.UnlockFileEx(windows.Handle(dirFile.Fd()), 0, 1, 0, &windows.Overlapped{}); err != nil { - log.L.WithError(err).Errorf("failed to unlock %q", dir) + log.L.WithError(err).Errorf("failed to unlock %q", name) } }() return fn() diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index f59936f8f18..1c40f845cb3 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -27,10 +27,13 @@ import ( "path/filepath" "sort" "sync" + "time" + "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/runtime/v2/logging" "github.com/containerd/log" + "github.com/containerd/nerdctl/pkg/lockutil" ) const ( @@ -113,9 +116,10 @@ func Main(argv2 string) error { // LogConfig is marshalled as "log-config.json" type LogConfig struct { - Driver string `json:"driver"` - Opts map[string]string `json:"opts,omitempty"` - LogURI string `json:"-"` + Driver string `json:"driver"` + Opts map[string]string `json:"opts,omitempty"` + HostAddress string `json:"host"` + LogURI string `json:"-"` } // LogConfigFilePath returns the path of log-config.json @@ -140,10 +144,73 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) { return logConfig, nil } -func loggingProcessAdapter(driver Driver, dataStore string, config *logging.Config) error { +func getLockPath(dataStore, ns, id string) string { + return filepath.Join(dataStore, "containers", ns, id, "logger-lock") +} + +// WaitForLogger waits until the logger has finished executing and processing container logs +func WaitForLogger(dataStore, ns, id string) error { + return lockutil.WithLock(getLockPath(dataStore, ns, id), func() error { + return nil + }) +} + +// getContainerWait loads the container from ID and returns its wait channel +func getContainerWait(ctx context.Context, hostAddress string, config *logging.Config) (<-chan containerd.ExitStatus, error) { + client, err := containerd.New(hostAddress, containerd.WithDefaultNamespace(config.Namespace)) + if err != nil { + return nil, err + } + con, err := client.LoadContainer(ctx, config.ID) + if err != nil { + return nil, err + } + task, err := con.Task(ctx, nil) + if err != nil { + return nil, err + } + + // If task was not found, it's possible that the container runtime is still being created. + // Retry every 100ms. + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return nil, errors.New("timed out waiting for container task to start") + case <-ticker.C: + task, err = con.Task(ctx, nil) + if err != nil { + if errdefs.IsNotFound(err) { + continue + } + return nil, err + } + return task.Wait(ctx) + } + } +} + +func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAddress string, config *logging.Config) error { if err := driver.PreProcess(dataStore, config); err != nil { return err } + + // initialize goroutines to copy stdout and stderr streams to a closable pipe + stdoutR, stdoutW := io.Pipe() + stderrR, stderrW := io.Pipe() + copyStream := func(reader io.Reader, writer *io.PipeWriter) { + // copy using a buffer of size 32K + buf := make([]byte, 32<<10) + _, err := io.CopyBuffer(writer, reader, buf) + if err != nil { + log.L.Errorf("failed to copy stream: %s", err) + } + } + go copyStream(config.Stdout, stdoutW) + go copyStream(config.Stderr, stderrW) + + // scan and process logs from pipes var wg sync.WaitGroup wg.Add(3) stdout := make(chan string, 10000) @@ -161,12 +228,24 @@ func loggingProcessAdapter(driver Driver, dataStore string, config *logging.Conf } } - go processLogFunc(config.Stdout, stdout) - go processLogFunc(config.Stderr, stderr) + go processLogFunc(stdoutR, stdout) + go processLogFunc(stderrR, stderr) go func() { defer wg.Done() driver.Process(stdout, stderr) }() + go func() { + // close stdout and stderr upon container exit + defer stdoutW.Close() + defer stderrW.Close() + + exitCh, err := getContainerWait(ctx, hostAddress, config) + if err != nil { + log.L.Errorf("failed to get container task wait channel: %v", err) + return + } + <-exitCh + }() wg.Wait() return driver.PostProcess() } @@ -175,7 +254,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { if dataStore == "" { return nil, errors.New("got empty data store") } - return func(_ context.Context, config *logging.Config, ready func() error) error { + return func(ctx context.Context, config *logging.Config, ready func() error) error { if config.Namespace == "" || config.ID == "" { return errors.New("got invalid config") } @@ -189,11 +268,24 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { if err != nil { return err } - if err := ready(); err != nil { + + lockFile := getLockPath(dataStore, config.Namespace, config.ID) + f, err := os.Create(lockFile) + if err != nil { return err } + defer f.Close() + + // the logger will obtain an exclusive lock on a file until the container is + // stopped and the driver has finished processing all output, + // so that waiting log viewers can be signalled when the process is complete. + return lockutil.WithLock(lockFile, func() error { + if err := ready(); err != nil { + return err + } - return loggingProcessAdapter(driver, dataStore, config) + return loggingProcessAdapter(ctx, driver, dataStore, logConfig.HostAddress, config) + }) } else if !errors.Is(err, os.ErrNotExist) { // the file does not exist if the container was created with nerdctl < 0.20 return err diff --git a/pkg/mountutil/volumestore/volumestore.go b/pkg/mountutil/volumestore/volumestore.go index f645a167e71..7c8df8d32b6 100644 --- a/pkg/mountutil/volumestore/volumestore.go +++ b/pkg/mountutil/volumestore/volumestore.go @@ -110,7 +110,7 @@ func (vs *volumeStore) Create(name string, labels []string) (*native.Volume, err return os.WriteFile(volFilePath, labelsJSON, 0644) } - if err := lockutil.WithDirLock(vs.dir, fn); err != nil { + if err := lockutil.WithLock(vs.dir, fn); err != nil { return nil, err } @@ -188,7 +188,7 @@ func (vs *volumeStore) Remove(names []string) ([]string, error) { } return nil } - err := lockutil.WithDirLock(vs.dir, fn) + err := lockutil.WithLock(vs.dir, fn) return removed, err } diff --git a/pkg/namestore/namestore.go b/pkg/namestore/namestore.go index 1ab9b4b3b18..df4ee86faf7 100644 --- a/pkg/namestore/namestore.go +++ b/pkg/namestore/namestore.go @@ -61,7 +61,7 @@ func (x *nameStore) Acquire(name, id string) error { } return os.WriteFile(fileName, []byte(id), 0600) } - return lockutil.WithDirLock(x.dir, fn) + return lockutil.WithLock(x.dir, fn) } func (x *nameStore) Release(name, id string) error { @@ -88,7 +88,7 @@ func (x *nameStore) Release(name, id string) error { } return os.RemoveAll(fileName) } - return lockutil.WithDirLock(x.dir, fn) + return lockutil.WithLock(x.dir, fn) } func (x *nameStore) Rename(oldName, id, newName string) error { @@ -113,5 +113,5 @@ func (x *nameStore) Rename(oldName, id, newName string) error { } return os.Rename(oldFileName, newFileName) } - return lockutil.WithDirLock(x.dir, fn) + return lockutil.WithLock(x.dir, fn) } diff --git a/pkg/netutil/netutil.go b/pkg/netutil/netutil.go index f6e0079cd9f..1181e667f47 100644 --- a/pkg/netutil/netutil.go +++ b/pkg/netutil/netutil.go @@ -264,7 +264,7 @@ func (e *CNIEnv) CreateNetwork(opts CreateOptions) (*NetworkConfig, error) { //n } return e.writeNetworkConfig(net) } - err = lockutil.WithDirLock(e.NetconfPath, fn) + err = lockutil.WithLock(e.NetconfPath, fn) if err != nil { return nil, err } @@ -278,7 +278,7 @@ func (e *CNIEnv) RemoveNetwork(net *NetworkConfig) error { } return net.clean() } - return lockutil.WithDirLock(e.NetconfPath, fn) + return lockutil.WithLock(e.NetconfPath, fn) } // GetDefaultNetworkConfig checks whether the default network exists