From c06c2ab4a24a6c636719e52a48c3136387e79b4c Mon Sep 17 00:00:00 2001 From: Mrudul Harwani Date: Thu, 29 Jun 2023 02:29:06 -0700 Subject: [PATCH 1/6] fix: shutdown logger after container process exits Signed-off-by: Mrudul Harwani fixes to squash Signed-off-by: fahed dorgaa fixes to squash Signed-off-by: fahed dorgaa --- pkg/cmd/container/create.go | 9 +++--- pkg/logging/json_logger.go | 56 +++++++++++++++++++-------------- pkg/logging/logging.go | 63 ++++++++++++++++++++++++++++++++----- 3 files changed, 92 insertions(+), 36 deletions(-) diff --git a/pkg/cmd/container/create.go b/pkg/cmd/container/create.go index c39d036c628..680d2b7ea5c 100644 --- a/pkg/cmd/container/create.go +++ b/pkg/cmd/container/create.go @@ -161,7 +161,7 @@ func Create(ctx context.Context, client *containerd.Client, args []string, netMa // 1, nerdctl run --name demo -it imagename // 2, ctrl + c to stop demo container // 3, nerdctl start/restart demo - logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, options.GOptions.Namespace) + logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, &options.GOptions) if err != nil { return nil, nil, err } @@ -661,8 +661,9 @@ func writeCIDFile(path, id string) error { } // generateLogConfig creates a LogConfig for the current container store -func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, ns string) (logConfig logging.LogConfig, err error) { +func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, gOpt *types.GlobalCommandOptions) (logConfig logging.LogConfig, err error) { var u *url.URL + logConfig.HostAddress = gOpt.Address if u, err = url.Parse(logDriver); err == nil && u.Scheme != "" { logConfig.LogURI = logDriver } else { @@ -680,7 +681,7 @@ func generateLogConfig(dataStore string, id string, logDriver string, logOpt []s if err != nil { return } - if err = logDriverInst.Init(dataStore, ns, id); err != nil { + if err = logDriverInst.Init(dataStore, gOpt.Namespace, id); err != nil { return } @@ -689,7 +690,7 @@ func generateLogConfig(dataStore string, id string, logDriver string, logOpt []s return } - logConfigFilePath := logging.LogConfigFilePath(dataStore, ns, id) + logConfigFilePath := logging.LogConfigFilePath(dataStore, gOpt.Namespace, id) if err = os.WriteFile(logConfigFilePath, logConfigB, 0600); err != nil { return } diff --git a/pkg/logging/json_logger.go b/pkg/logging/json_logger.go index 88c8555d3e0..d88f167fcdb 100644 --- a/pkg/logging/json_logger.go +++ b/pkg/logging/json_logger.go @@ -157,37 +157,45 @@ func viewLogsJSONFileDirect(lvopts LogViewOptions, jsonLogFilePath string, stdou return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) } fin.Close() + + readFromLastPos := func() error { + // Re-open the file and seek to the last-consumed offset. + fin, err = os.OpenFile(jsonLogFilePath, os.O_RDONLY, 0400) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while trying to re-open JSON logfile %q: %s", jsonLogFilePath, err) + } + _, err = fin.Seek(lastPos, 0) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) + } + + err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while doing follow-up decoding of JSON logfile %q at starting position %d: %s", jsonLogFilePath, lastPos, err) + } + + // Record current file seek position before looping again. + lastPos, err = fin.Seek(0, io.SeekCurrent) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while trying to seek JSON logfile %q at current position: %s", jsonLogFilePath, err) + } + fin.Close() + return nil + } + for { select { case <-stopChannel: log.L.Debugf("received stop signal while re-reading JSON logfile, returning") return nil default: - // Re-open the file and seek to the last-consumed offset. - fin, err = os.OpenFile(jsonLogFilePath, os.O_RDONLY, 0400) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while trying to re-open JSON logfile %q: %s", jsonLogFilePath, err) + if err = readFromLastPos(); err != nil { + return err } - _, err = fin.Seek(lastPos, 0) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) - } - - err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while doing follow-up decoding of JSON logfile %q at starting position %d: %s", jsonLogFilePath, lastPos, err) - } - - // Record current file seek position before looping again. - lastPos, err = fin.Seek(0, io.SeekCurrent) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while trying to seek JSON logfile %q at current position: %s", jsonLogFilePath, err) - } - fin.Close() } // Give the OS a second to breathe before re-opening the file: time.Sleep(time.Second) diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index f59936f8f18..f2b107ba6f3 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -28,6 +28,7 @@ import ( "sort" "sync" + "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/runtime/v2/logging" "github.com/containerd/log" @@ -113,9 +114,10 @@ func Main(argv2 string) error { // LogConfig is marshalled as "log-config.json" type LogConfig struct { - Driver string `json:"driver"` - Opts map[string]string `json:"opts,omitempty"` - LogURI string `json:"-"` + Driver string `json:"driver"` + Opts map[string]string `json:"opts,omitempty"` + HostAddress string `json:"host"` + LogURI string `json:"-"` } // LogConfigFilePath returns the path of log-config.json @@ -140,10 +142,43 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) { return logConfig, nil } -func loggingProcessAdapter(driver Driver, dataStore string, config *logging.Config) error { +// getContainerWait loads the container from ID and returns its wait channel +func getContainerWait(ctx context.Context, hostAddress string, config *logging.Config) (<-chan containerd.ExitStatus, error) { + client, err := containerd.New(hostAddress, containerd.WithDefaultNamespace(config.Namespace)) + if err != nil { + return nil, err + } + con, err := client.LoadContainer(ctx, config.ID) + if err != nil { + return nil, err + } + task, err := con.Task(ctx, nil) + if err != nil { + return nil, err + } + return task.Wait(ctx) +} + +func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAddress string, config *logging.Config) error { if err := driver.PreProcess(dataStore, config); err != nil { return err } + + // initialize goroutines to copy stdout and stderr streams to a closable pipe + stdoutR, stdoutW := io.Pipe() + stderrR, stderrW := io.Pipe() + copyStream := func(reader io.Reader, writer *io.PipeWriter) { + // copy using a buffer of size 16K + buf := make([]byte, 16*1024) + _, err := io.CopyBuffer(writer, reader, buf) + if err != nil { + log.L.Errorf("failed to copy stream: %s", err) + } + } + go copyStream(config.Stdout, stdoutW) + go copyStream(config.Stderr, stderrW) + + // scan and process logs from pipes var wg sync.WaitGroup wg.Add(3) stdout := make(chan string, 10000) @@ -161,12 +196,24 @@ func loggingProcessAdapter(driver Driver, dataStore string, config *logging.Conf } } - go processLogFunc(config.Stdout, stdout) - go processLogFunc(config.Stderr, stderr) + go processLogFunc(stdoutR, stdout) + go processLogFunc(stderrR, stderr) go func() { defer wg.Done() driver.Process(stdout, stderr) }() + go func() { + // close stdout and stderr upon container exit + defer stdoutW.Close() + defer stderrW.Close() + + exitCh, err := getContainerWait(ctx, hostAddress, config) + if err != nil { + log.L.Errorf("failed to get container task wait channel: %v", err) + return + } + <-exitCh + }() wg.Wait() return driver.PostProcess() } @@ -175,7 +222,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { if dataStore == "" { return nil, errors.New("got empty data store") } - return func(_ context.Context, config *logging.Config, ready func() error) error { + return func(ctx context.Context, config *logging.Config, ready func() error) error { if config.Namespace == "" || config.ID == "" { return errors.New("got invalid config") } @@ -193,7 +240,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { return err } - return loggingProcessAdapter(driver, dataStore, config) + return loggingProcessAdapter(ctx, driver, dataStore, logConfig.HostAddress, config) } else if !errors.Is(err, os.ErrNotExist) { // the file does not exist if the container was created with nerdctl < 0.20 return err From af732856f68dd8cecfe7b48125d04926e438715a Mon Sep 17 00:00:00 2001 From: Mrudul Harwani Date: Thu, 29 Jun 2023 11:41:16 -0700 Subject: [PATCH 2/6] add tests for the logging changes Signed-off-by: Mrudul Harwani --- cmd/nerdctl/container_logs_test.go | 53 ++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/cmd/nerdctl/container_logs_test.go b/cmd/nerdctl/container_logs_test.go index 3f78ba8e2ce..a107982182c 100644 --- a/cmd/nerdctl/container_logs_test.go +++ b/cmd/nerdctl/container_logs_test.go @@ -18,6 +18,7 @@ package main import ( "fmt" + "runtime" "strings" "testing" "time" @@ -143,3 +144,55 @@ func TestLogsWithFailingContainer(t *testing.T) { base.Cmd("logs", "-f", containerName).AssertNoOut("baz") base.Cmd("rm", "-f", containerName).AssertOK() } + +func TestLogsWithRunningContainer(t *testing.T) { + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", containerName).Run() + expected := []string{} + for i := 1; i <= 10; i++ { + expected = append(expected, fmt.Sprint(i)) + } + + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "sh", "-euc", "for i in `seq 1 10`; do echo $i; sleep 1; done").AssertOK() + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + base.Cmd("rm", "-f", containerName).AssertOK() +} + +func TestLogsWithoutNewlineOrEOF(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("FIXME: test does not work on Windows yet because containerd doesn't send an exit event appropriately after task exit on Windows") + } + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", containerName).Run() + expected := []string{"Hello World!", "There is no newline"} + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "printf", "'Hello World!\nThere is no newline'").AssertOK() + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + base.Cmd("rm", "-f", containerName).AssertOK() +} + +func TestLogsAfterRestartingContainer(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("FIXME: test does not work on Windows yet. Restarting a container fails with: failed to create shim task: hcs::CreateComputeSystem : The requested operation for attach namespace failed.: unknown") + } + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", containerName).Run() + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "printf", "'Hello World!\nThere is no newline'").AssertOK() + expected := []string{"Hello World!", "There is no newline"} + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + // restart and check logs again + base.Cmd("start", containerName) + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + base.Cmd("rm", "-f", containerName).AssertOK() +} From 37cddf88147256a18edcc69a08853991e412c036 Mon Sep 17 00:00:00 2001 From: Mrudul Harwani Date: Thu, 29 Jun 2023 12:39:56 -0700 Subject: [PATCH 3/6] wait after container exit in logs cmd instead of json_logger for unprocessed data Signed-off-by: Mrudul Harwani --- pkg/cmd/container/logs.go | 1 + pkg/logging/json_logger.go | 56 ++++++++++++++++---------------------- 2 files changed, 25 insertions(+), 32 deletions(-) diff --git a/pkg/cmd/container/logs.go b/pkg/cmd/container/logs.go index 239074c36cd..8ed508f4a4c 100644 --- a/pkg/cmd/container/logs.go +++ b/pkg/cmd/container/logs.go @@ -22,6 +22,7 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" diff --git a/pkg/logging/json_logger.go b/pkg/logging/json_logger.go index d88f167fcdb..88c8555d3e0 100644 --- a/pkg/logging/json_logger.go +++ b/pkg/logging/json_logger.go @@ -157,45 +157,37 @@ func viewLogsJSONFileDirect(lvopts LogViewOptions, jsonLogFilePath string, stdou return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) } fin.Close() - - readFromLastPos := func() error { - // Re-open the file and seek to the last-consumed offset. - fin, err = os.OpenFile(jsonLogFilePath, os.O_RDONLY, 0400) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while trying to re-open JSON logfile %q: %s", jsonLogFilePath, err) - } - _, err = fin.Seek(lastPos, 0) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) - } - - err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while doing follow-up decoding of JSON logfile %q at starting position %d: %s", jsonLogFilePath, lastPos, err) - } - - // Record current file seek position before looping again. - lastPos, err = fin.Seek(0, io.SeekCurrent) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while trying to seek JSON logfile %q at current position: %s", jsonLogFilePath, err) - } - fin.Close() - return nil - } - for { select { case <-stopChannel: log.L.Debugf("received stop signal while re-reading JSON logfile, returning") return nil default: - if err = readFromLastPos(); err != nil { - return err + // Re-open the file and seek to the last-consumed offset. + fin, err = os.OpenFile(jsonLogFilePath, os.O_RDONLY, 0400) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while trying to re-open JSON logfile %q: %s", jsonLogFilePath, err) } + _, err = fin.Seek(lastPos, 0) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) + } + + err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while doing follow-up decoding of JSON logfile %q at starting position %d: %s", jsonLogFilePath, lastPos, err) + } + + // Record current file seek position before looping again. + lastPos, err = fin.Seek(0, io.SeekCurrent) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while trying to seek JSON logfile %q at current position: %s", jsonLogFilePath, err) + } + fin.Close() } // Give the OS a second to breathe before re-opening the file: time.Sleep(time.Second) From a656830c7fa12fb4ac7da6fac69520edf8856d65 Mon Sep 17 00:00:00 2001 From: Mrudul Harwani Date: Wed, 19 Jul 2023 07:58:12 -0700 Subject: [PATCH 4/6] minor changes to address review feedback Signed-off-by: Mrudul Harwani --- pkg/cmd/container/create.go | 10 +++++----- pkg/logging/logging.go | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/cmd/container/create.go b/pkg/cmd/container/create.go index 680d2b7ea5c..1c788720c56 100644 --- a/pkg/cmd/container/create.go +++ b/pkg/cmd/container/create.go @@ -161,7 +161,7 @@ func Create(ctx context.Context, client *containerd.Client, args []string, netMa // 1, nerdctl run --name demo -it imagename // 2, ctrl + c to stop demo container // 3, nerdctl start/restart demo - logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, &options.GOptions) + logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, options.GOptions.Namespace, options.GOptions.Address) if err != nil { return nil, nil, err } @@ -661,9 +661,9 @@ func writeCIDFile(path, id string) error { } // generateLogConfig creates a LogConfig for the current container store -func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, gOpt *types.GlobalCommandOptions) (logConfig logging.LogConfig, err error) { +func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, ns, hostAddress string) (logConfig logging.LogConfig, err error) { var u *url.URL - logConfig.HostAddress = gOpt.Address + logConfig.HostAddress = hostAddress if u, err = url.Parse(logDriver); err == nil && u.Scheme != "" { logConfig.LogURI = logDriver } else { @@ -681,7 +681,7 @@ func generateLogConfig(dataStore string, id string, logDriver string, logOpt []s if err != nil { return } - if err = logDriverInst.Init(dataStore, gOpt.Namespace, id); err != nil { + if err = logDriverInst.Init(dataStore, ns, id); err != nil { return } @@ -690,7 +690,7 @@ func generateLogConfig(dataStore string, id string, logDriver string, logOpt []s return } - logConfigFilePath := logging.LogConfigFilePath(dataStore, gOpt.Namespace, id) + logConfigFilePath := logging.LogConfigFilePath(dataStore, ns, id) if err = os.WriteFile(logConfigFilePath, logConfigB, 0600); err != nil { return } diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index f2b107ba6f3..158553b22ca 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -168,8 +168,8 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAd stdoutR, stdoutW := io.Pipe() stderrR, stderrW := io.Pipe() copyStream := func(reader io.Reader, writer *io.PipeWriter) { - // copy using a buffer of size 16K - buf := make([]byte, 16*1024) + // copy using a buffer of size 32K + buf := make([]byte, 32<<10) _, err := io.CopyBuffer(writer, reader, buf) if err != nil { log.L.Errorf("failed to copy stream: %s", err) From 816317f419059cd949eb6ba4ee50e056be403c92 Mon Sep 17 00:00:00 2001 From: Mrudul Harwani Date: Sun, 30 Jul 2023 21:44:46 -0700 Subject: [PATCH 5/6] implement method to wait for logger exit and rename WithDirLock to WithLock Signed-off-by: Mrudul Harwani --- pkg/cmd/container/logs.go | 10 +++++--- pkg/dnsutil/hostsstore/hostsstore.go | 10 ++++---- pkg/lockutil/lockutil_unix.go | 8 +++---- pkg/lockutil/lockutil_windows.go | 8 +++---- pkg/logging/logging.go | 29 ++++++++++++++++++++++-- pkg/mountutil/volumestore/volumestore.go | 4 ++-- pkg/namestore/namestore.go | 6 ++--- pkg/netutil/netutil.go | 4 ++-- 8 files changed, 54 insertions(+), 25 deletions(-) diff --git a/pkg/cmd/container/logs.go b/pkg/cmd/container/logs.go index 8ed508f4a4c..e24454d0795 100644 --- a/pkg/cmd/container/logs.go +++ b/pkg/cmd/container/logs.go @@ -22,11 +22,11 @@ import ( "os" "os/signal" "syscall" - "time" + + "github.com/containerd/log" "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" - "github.com/containerd/log" "github.com/containerd/nerdctl/pkg/api/types" "github.com/containerd/nerdctl/pkg/api/types/cri" "github.com/containerd/nerdctl/pkg/clientutil" @@ -91,7 +91,11 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti // Setup goroutine to send stop event if container task finishes: go func() { <-waitCh - log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer") + // Wait for logger to process remaining logs after container exit + if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { + log.L.WithError(err).Error("failed to wait for logger shutdown") + } + log.L.Debugf("container task has finished, sending kill signal to log viewer") stopChannel <- os.Interrupt }() } diff --git a/pkg/dnsutil/hostsstore/hostsstore.go b/pkg/dnsutil/hostsstore/hostsstore.go index 40a7726e66d..d865aae2ac3 100644 --- a/pkg/dnsutil/hostsstore/hostsstore.go +++ b/pkg/dnsutil/hostsstore/hostsstore.go @@ -73,7 +73,7 @@ func AllocHostsFile(dataStore, ns, id string) (string, error) { fn := func() error { return ensureFile(path) } - err := lockutil.WithDirLock(lockDir, fn) + err := lockutil.WithLock(lockDir, fn) return path, err } @@ -86,7 +86,7 @@ func DeallocHostsFile(dataStore, ns, id string) error { fn := func() error { return os.RemoveAll(dirToBeRemoved) } - return lockutil.WithDirLock(lockDir, fn) + return lockutil.WithLock(lockDir, fn) } func NewStore(dataStore string) (Store, error) { @@ -135,7 +135,7 @@ func (x *store) Acquire(meta Meta) error { } return newUpdater(meta.ID, x.hostsD).update() } - return lockutil.WithDirLock(x.hostsD, fn) + return lockutil.WithLock(x.hostsD, fn) } // Release is triggered by Poststop hooks. @@ -155,7 +155,7 @@ func (x *store) Release(ns, id string) error { } return newUpdater(id, x.hostsD).update() } - return lockutil.WithDirLock(x.hostsD, fn) + return lockutil.WithLock(x.hostsD, fn) } func (x *store) Update(ns, id, newName string) error { @@ -179,5 +179,5 @@ func (x *store) Update(ns, id, newName string) error { } return newUpdater(meta.ID, x.hostsD).update() } - return lockutil.WithDirLock(x.hostsD, fn) + return lockutil.WithLock(x.hostsD, fn) } diff --git a/pkg/lockutil/lockutil_unix.go b/pkg/lockutil/lockutil_unix.go index dbabcbc9b0f..01c9be133be 100644 --- a/pkg/lockutil/lockutil_unix.go +++ b/pkg/lockutil/lockutil_unix.go @@ -26,18 +26,18 @@ import ( "golang.org/x/sys/unix" ) -func WithDirLock(dir string, fn func() error) error { - dirFile, err := os.Open(dir) +func WithLock(name string, fn func() error) error { + dirFile, err := os.Open(name) if err != nil { return err } defer dirFile.Close() if err := Flock(dirFile, unix.LOCK_EX); err != nil { - return fmt.Errorf("failed to lock %q: %w", dir, err) + return fmt.Errorf("failed to lock %q: %w", name, err) } defer func() { if err := Flock(dirFile, unix.LOCK_UN); err != nil { - log.L.WithError(err).Errorf("failed to unlock %q", dir) + log.L.WithError(err).Errorf("failed to unlock %q", name) } }() return fn() diff --git a/pkg/lockutil/lockutil_windows.go b/pkg/lockutil/lockutil_windows.go index 85658167443..1a293ca8c0e 100644 --- a/pkg/lockutil/lockutil_windows.go +++ b/pkg/lockutil/lockutil_windows.go @@ -24,8 +24,8 @@ import ( "golang.org/x/sys/windows" ) -func WithDirLock(dir string, fn func() error) error { - dirFile, err := os.OpenFile(dir+".lock", os.O_CREATE, 0644) +func WithLock(name string, fn func() error) error { + dirFile, err := os.OpenFile(name+".lock", os.O_CREATE, 0644) if err != nil { return err } @@ -33,12 +33,12 @@ func WithDirLock(dir string, fn func() error) error { // see https://msdn.microsoft.com/en-us/library/windows/desktop/aa365203(v=vs.85).aspx // 1 lock immediately if err = windows.LockFileEx(windows.Handle(dirFile.Fd()), 1, 0, 1, 0, &windows.Overlapped{}); err != nil { - return fmt.Errorf("failed to lock %q: %w", dir, err) + return fmt.Errorf("failed to lock %q: %w", name, err) } defer func() { if err := windows.UnlockFileEx(windows.Handle(dirFile.Fd()), 0, 1, 0, &windows.Overlapped{}); err != nil { - log.L.WithError(err).Errorf("failed to unlock %q", dir) + log.L.WithError(err).Errorf("failed to unlock %q", name) } }() return fn() diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 158553b22ca..73c70416690 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -32,6 +32,7 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/runtime/v2/logging" "github.com/containerd/log" + "github.com/containerd/nerdctl/pkg/lockutil" ) const ( @@ -142,6 +143,17 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) { return logConfig, nil } +func getLockPath(dataStore, ns, id string) string { + return filepath.Join(dataStore, "containers", ns, id, "logger-lock") +} + +// WaitForLogger waits until the logger has finished executing and processing container logs +func WaitForLogger(dataStore, ns, id string) error { + return lockutil.WithLock(getLockPath(dataStore, ns, id), func() error { + return nil + }) +} + // getContainerWait loads the container from ID and returns its wait channel func getContainerWait(ctx context.Context, hostAddress string, config *logging.Config) (<-chan containerd.ExitStatus, error) { client, err := containerd.New(hostAddress, containerd.WithDefaultNamespace(config.Namespace)) @@ -236,11 +248,24 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { if err != nil { return err } - if err := ready(); err != nil { + + lockFile := getLockPath(dataStore, config.Namespace, config.ID) + f, err := os.Create(lockFile) + if err != nil { return err } + defer f.Close() + + // the logger will obtain an exclusive lock on a file until the container is + // stopped and the driver has finished processing all output, + // so that waiting log viewers can be signalled when the process is complete. + return lockutil.WithLock(loggerLock, func() error { + if err := ready(); err != nil { + return err + } - return loggingProcessAdapter(ctx, driver, dataStore, logConfig.HostAddress, config) + return loggingProcessAdapter(ctx, driver, dataStore, logConfig.HostAddress, config) + }) } else if !errors.Is(err, os.ErrNotExist) { // the file does not exist if the container was created with nerdctl < 0.20 return err diff --git a/pkg/mountutil/volumestore/volumestore.go b/pkg/mountutil/volumestore/volumestore.go index f645a167e71..7c8df8d32b6 100644 --- a/pkg/mountutil/volumestore/volumestore.go +++ b/pkg/mountutil/volumestore/volumestore.go @@ -110,7 +110,7 @@ func (vs *volumeStore) Create(name string, labels []string) (*native.Volume, err return os.WriteFile(volFilePath, labelsJSON, 0644) } - if err := lockutil.WithDirLock(vs.dir, fn); err != nil { + if err := lockutil.WithLock(vs.dir, fn); err != nil { return nil, err } @@ -188,7 +188,7 @@ func (vs *volumeStore) Remove(names []string) ([]string, error) { } return nil } - err := lockutil.WithDirLock(vs.dir, fn) + err := lockutil.WithLock(vs.dir, fn) return removed, err } diff --git a/pkg/namestore/namestore.go b/pkg/namestore/namestore.go index 1ab9b4b3b18..df4ee86faf7 100644 --- a/pkg/namestore/namestore.go +++ b/pkg/namestore/namestore.go @@ -61,7 +61,7 @@ func (x *nameStore) Acquire(name, id string) error { } return os.WriteFile(fileName, []byte(id), 0600) } - return lockutil.WithDirLock(x.dir, fn) + return lockutil.WithLock(x.dir, fn) } func (x *nameStore) Release(name, id string) error { @@ -88,7 +88,7 @@ func (x *nameStore) Release(name, id string) error { } return os.RemoveAll(fileName) } - return lockutil.WithDirLock(x.dir, fn) + return lockutil.WithLock(x.dir, fn) } func (x *nameStore) Rename(oldName, id, newName string) error { @@ -113,5 +113,5 @@ func (x *nameStore) Rename(oldName, id, newName string) error { } return os.Rename(oldFileName, newFileName) } - return lockutil.WithDirLock(x.dir, fn) + return lockutil.WithLock(x.dir, fn) } diff --git a/pkg/netutil/netutil.go b/pkg/netutil/netutil.go index f6e0079cd9f..1181e667f47 100644 --- a/pkg/netutil/netutil.go +++ b/pkg/netutil/netutil.go @@ -264,7 +264,7 @@ func (e *CNIEnv) CreateNetwork(opts CreateOptions) (*NetworkConfig, error) { //n } return e.writeNetworkConfig(net) } - err = lockutil.WithDirLock(e.NetconfPath, fn) + err = lockutil.WithLock(e.NetconfPath, fn) if err != nil { return nil, err } @@ -278,7 +278,7 @@ func (e *CNIEnv) RemoveNetwork(net *NetworkConfig) error { } return net.clean() } - return lockutil.WithDirLock(e.NetconfPath, fn) + return lockutil.WithLock(e.NetconfPath, fn) } // GetDefaultNetworkConfig checks whether the default network exists From 21730dbc346cb109c2bba4bc994d4a25c03c2fb6 Mon Sep 17 00:00:00 2001 From: fahed dorgaa Date: Mon, 6 Nov 2023 18:36:35 +0100 Subject: [PATCH 6/6] add retry attempts in logger to wait for task start Signed-off-by: fahed dorgaa --- pkg/logging/logging.go | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 73c70416690..1c40f845cb3 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -27,6 +27,7 @@ import ( "path/filepath" "sort" "sync" + "time" "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" @@ -168,7 +169,26 @@ func getContainerWait(ctx context.Context, hostAddress string, config *logging.C if err != nil { return nil, err } - return task.Wait(ctx) + + // If task was not found, it's possible that the container runtime is still being created. + // Retry every 100ms. + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return nil, errors.New("timed out waiting for container task to start") + case <-ticker.C: + task, err = con.Task(ctx, nil) + if err != nil { + if errdefs.IsNotFound(err) { + continue + } + return nil, err + } + return task.Wait(ctx) + } + } } func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAddress string, config *logging.Config) error { @@ -259,7 +279,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { // the logger will obtain an exclusive lock on a file until the container is // stopped and the driver has finished processing all output, // so that waiting log viewers can be signalled when the process is complete. - return lockutil.WithLock(loggerLock, func() error { + return lockutil.WithLock(lockFile, func() error { if err := ready(); err != nil { return err }