diff --git a/cgroup2/manager.go b/cgroup2/manager.go index 5bd8ecf1..3536fc97 100644 --- a/cgroup2/manager.go +++ b/cgroup2/manager.go @@ -764,32 +764,69 @@ func (c *Manager) MemoryEventFD() (int, uint32, error) { return fd, uint32(wd), nil } -func (c *Manager) EventChan() (<-chan Event, <-chan error) { - ec := make(chan Event) - errCh := make(chan error, 1) - go c.waitForEvents(ec, errCh) +// memoryEventNonBlockFD returns a non-blocking inotify file descriptor monitoring memory.events. +// +// NOTE: Block FD is expensive because unix.Read will block that thread once there is +// available data to read. In high scale scenarios, it will create a lot of threads. +func (c *Manager) memoryEventNonBlockFD() (_ *os.File, retErr error) { - return ec, errCh + rawFd, err := unix.InotifyInit1(unix.IN_CLOEXEC | unix.IN_NONBLOCK) + if err != nil { + return nil, fmt.Errorf("failed to create inotify fd: %w", err) + } + + fd := os.NewFile(uintptr(rawFd), "inotifyfd") + defer func() { + if retErr != nil { + fd.Close() + } + }() + + fpath := filepath.Join(c.path, "memory.events") + if _, err := unix.InotifyAddWatch(rawFd, fpath, unix.IN_MODIFY); err != nil { + return nil, fmt.Errorf("failed to add inotify watch for %q: %w", fpath, err) + } + + // monitor to detect process exit/cgroup deletion + evpath := filepath.Join(c.path, "cgroup.events") + if _, err = unix.InotifyAddWatch(rawFd, evpath, unix.IN_MODIFY); err != nil { + return nil, fmt.Errorf("failed to add inotify watch for %q: %w", evpath, err) + } + return fd, nil } -func (c *Manager) waitForEvents(ec chan<- Event, errCh chan<- error) { - defer close(errCh) +func (c *Manager) EventChan() (<-chan Event, <-chan error) { + ec := make(chan Event, 1) + errCh := make(chan error, 1) - fd, _, err := c.MemoryEventFD() + fd, err := c.memoryEventNonBlockFD() if err != nil { errCh <- err - return + return ec, errCh } - defer unix.Close(fd) - for { - buffer := make([]byte, unix.SizeofInotifyEvent*10) - bytesRead, err := unix.Read(fd, buffer) - if err != nil { - errCh <- err - return - } - if bytesRead >= unix.SizeofInotifyEvent { + go func() { + defer close(errCh) + defer fd.Close() + + for { + buffer := make([]byte, unix.SizeofInotifyEvent*10) + bytesRead, err := fd.Read(buffer) + if err != nil { + errCh <- err + return + } + + if bytesRead < unix.SizeofInotifyEvent { + continue + } + + // Check cgroup.events first + shouldExit := false + if c.isCgroupEmpty() { + shouldExit = true + } + out := make(map[string]uint64) if err := readKVStatsFile(c.path, "memory.events", out); err != nil { // When cgroup is deleted read may return -ENODEV instead of -ENOENT from open. @@ -798,6 +835,7 @@ func (c *Manager) waitForEvents(ec chan<- Event, errCh chan<- error) { } return } + ec <- Event{ Low: out["low"], High: out["high"], @@ -805,11 +843,13 @@ func (c *Manager) waitForEvents(ec chan<- Event, errCh chan<- error) { OOM: out["oom"], OOMKill: out["oom_kill"], } - if c.isCgroupEmpty() { + + if shouldExit { return } } - } + }() + return ec, errCh } func setDevices(path string, devices []specs.LinuxDeviceCgroup) error { diff --git a/cgroup2/manager_test.go b/cgroup2/manager_test.go index dd9d2caa..4902e576 100644 --- a/cgroup2/manager_test.go +++ b/cgroup2/manager_test.go @@ -118,6 +118,62 @@ func TestEventChanCleanupOnCgroupRemoval(t *testing.T) { goleak.VerifyNone(t) } +func TestEventChanCleanupAfterOOMKill(t *testing.T) { + checkCgroupMode(t) + + groupPath := fmt.Sprintf("/testing-oom-watcher-%d", time.Now().UnixNano()) + c, err := NewManager(defaultCgroup2Path, groupPath, + &Resources{ + Memory: &Memory{ + Max: toPtr(int64(15 * 1024 * 1024)), // 15MB + Swap: toPtr(int64(15 * 1024 * 1024)), // 15MB + }, + }, + ) + require.NoError(t, err, "failed to init new cgroup manager") + defer func() { + require.NoError(t, c.Delete()) + }() + + cgroupFD, err := os.Open(c.path) + require.NoError(t, err, "failed to open cgroup path") + defer cgroupFD.Close() + + evCh, errCh := c.EventChan() + + cmd := exec.Command("dd", "if=/dev/zero", "of=/dev/null", "bs=64M") + cmd.SysProcAttr = &syscall.SysProcAttr{ + Pdeathsig: syscall.SIGKILL, + UseCgroupFD: true, + CgroupFD: int(cgroupFD.Fd()), + } + + err = cmd.Start() + require.NoError(t, err, "failed to start dd process") + + err = cmd.Wait() + require.Error(t, err) + + for ev := range evCh { + t.Logf("Received memory event: %+v", ev) + if ev.OOMKill > 0 { + break + } + } + + done := false + for !done { + select { + case err := <-errCh: + require.NoError(t, err, "unexpected error on error channel") + done = true + case <-time.After(5 * time.Second): + t.Fatal("Timed out") + } + } + goleak.VerifyNone(t) +} + func TestSystemdFullPath(t *testing.T) { tests := []struct { inputSlice string @@ -406,3 +462,7 @@ func BenchmarkStat(b *testing.B) { require.NoError(b, err) } } + +func toPtr[T any](v T) *T { + return &v +}