diff --git a/cmd/cgctl/main.go b/cmd/cgctl/main.go index 5e102349..67243703 100644 --- a/cmd/cgctl/main.go +++ b/cmd/cgctl/main.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "os" + "strconv" "github.com/containerd/cgroups" v2 "github.com/containerd/cgroups/v2" @@ -50,6 +51,8 @@ func main() { listCommand, listControllersCommand, statCommand, + newSystemdCommand, + deleteSystemdCommand, } app.Before = func(clix *cli.Context) error { if clix.GlobalBool("debug") { @@ -161,6 +164,42 @@ var statCommand = cli.Command{ }, } +var newSystemdCommand = cli.Command{ + Name: "systemd", + Usage: "create a new systemd managed cgroup", + Action: func(clix *cli.Context) error { + path := clix.Args().First() + pidStr := clix.Args().Get(1) + pid := os.Getpid() + if pidStr != "" { + pid, _ = strconv.Atoi(pidStr) + } + + _, err := v2.NewSystemd("", path, pid, &v2.Resources{}) + if err != nil { + return err + } + return nil + }, +} + +var deleteSystemdCommand = cli.Command{ + Name: "del-systemd", + Usage: "delete a systemd managed cgroup", + Action: func(clix *cli.Context) error { + path := clix.Args().First() + m, err := v2.LoadSystemd("", path) + if err != nil { + return err + } + err = m.DeleteSystemd() + if err != nil { + return err + } + return nil + }, +} + var modeCommand = cli.Command{ Name: "mode", Usage: "return the cgroup mode that is mounted on the system", diff --git a/v2/cpu.go b/v2/cpu.go index 648a93d3..65282ff0 100644 --- a/v2/cpu.go +++ b/v2/cpu.go @@ -17,6 +17,7 @@ package v2 import ( + "math" "strconv" "strings" ) @@ -38,6 +39,21 @@ type CPU struct { Mems string } +func (c CPUMax) extractQuotaAndPeriod() (int64, uint64) { + var ( + quota int64 + period uint64 + ) + values := strings.Split(string(c), " ") + if values[0] == "max" { + quota = math.MaxInt64 + } else { + quota, _ = strconv.ParseInt(values[0], 10, 64) + } + period, _ = strconv.ParseUint(values[1], 10, 64) + return quota, period +} + func (r *CPU) Values() (o []Value) { if r.Weight != nil { o = append(o, Value{ diff --git a/v2/cpuv2_test.go b/v2/cpuv2_test.go index f72ef2c8..e5ace2f3 100644 --- a/v2/cpuv2_test.go +++ b/v2/cpuv2_test.go @@ -18,9 +18,12 @@ package v2 import ( "fmt" + "math" "os" "strconv" "testing" + + "github.com/stretchr/testify/assert" ) func TestCgroupv2CpuStats(t *testing.T) { @@ -52,3 +55,36 @@ func TestCgroupv2CpuStats(t *testing.T) { checkFileContent(t, c.path, "cpuset.cpus", "0") checkFileContent(t, c.path, "cpuset.mems", "0") } + +func TestSystemdCgroupCpuController(t *testing.T) { + checkCgroupMode(t) + group := fmt.Sprintf("testing-cpu-%d.scope", os.Getpid()) + var weight uint64 = 100 + res := Resources{CPU: &CPU{Weight: &weight}} + c, err := NewSystemd("", group, os.Getpid(), &res) + if err != nil { + t.Fatal("failed to init new cgroup systemd manager: ", err) + } + checkFileContent(t, c.path, "cpu.weight", strconv.FormatUint(weight, 10)) +} + +func TestExtractQuotaAndPeriod(t *testing.T) { + var ( + period uint64 + quota int64 + ) + quota = 10000 + period = 8000 + cpuMax := NewCPUMax("a, &period) + tquota, tPeriod := cpuMax.extractQuotaAndPeriod() + + assert.Equal(t, quota, tquota) + assert.Equal(t, period, tPeriod) + + //case with nil quota which makes it "max" - max int val + cpuMax2 := NewCPUMax(nil, &period) + tquota2, tPeriod2 := cpuMax2.extractQuotaAndPeriod() + + assert.Equal(t, int64(math.MaxInt64), tquota2) + assert.Equal(t, period, tPeriod2) +} diff --git a/v2/manager.go b/v2/manager.go index 85f7d32e..c15dd31f 100644 --- a/v2/manager.go +++ b/v2/manager.go @@ -25,20 +25,31 @@ import ( "path/filepath" "strconv" "strings" + "sync" "syscall" "time" - "github.com/opencontainers/runtime-spec/specs-go" - "golang.org/x/sys/unix" "github.com/containerd/cgroups/v2/stats" + "github.com/godbus/dbus/v5" + "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + systemdDbus "github.com/coreos/go-systemd/v22/dbus" ) const ( - subtreeControl = "cgroup.subtree_control" - controllersFile = "cgroup.controllers" + subtreeControl = "cgroup.subtree_control" + controllersFile = "cgroup.controllers" + defaultCgroup2Path = "/sys/fs/cgroup" + defaultSlice = "system.slice" +) + +var ( + canDelegate bool + once sync.Once ) type cgValuer interface { @@ -583,3 +594,126 @@ func setDevices(path string, devices []specs.LinuxDeviceCgroup) error { } return nil } + +func NewSystemd(slice, group string, pid int, resources *Resources) (*Manager, error) { + if slice == "" { + slice = defaultSlice + } + path := filepath.Join(defaultCgroup2Path, slice, group) + conn, err := systemdDbus.New() + if err != nil { + return &Manager{}, err + } + defer conn.Close() + + properties := []systemdDbus.Property{ + systemdDbus.PropDescription(fmt.Sprintf("cgroup %s", group)), + newSystemdProperty("DefaultDependencies", false), + newSystemdProperty("MemoryAccounting", true), + newSystemdProperty("CPUAccounting", true), + newSystemdProperty("IOAccounting", true), + } + + // if we create a slice, the parent is defined via a Wants= + if strings.HasSuffix(group, ".slice") { + properties = append(properties, systemdDbus.PropWants(defaultSlice)) + } else { + // otherwise, we use Slice= + properties = append(properties, systemdDbus.PropSlice(defaultSlice)) + } + + // only add pid if its valid, -1 is used w/ general slice creation. + if pid != -1 { + properties = append(properties, newSystemdProperty("PIDs", []uint32{uint32(pid)})) + } + + if resources.Memory != nil && *resources.Memory.Max != 0 { + properties = append(properties, + newSystemdProperty("MemoryMax", uint64(*resources.Memory.Max))) + } + + if resources.CPU != nil && *resources.CPU.Weight != 0 { + properties = append(properties, + newSystemdProperty("CPUWeight", *resources.CPU.Weight)) + } + + if resources.CPU != nil && resources.CPU.Max != "" { + quota, period := resources.CPU.Max.extractQuotaAndPeriod() + // cpu.cfs_quota_us and cpu.cfs_period_us are controlled by systemd. + // corresponds to USEC_INFINITY in systemd + // if USEC_INFINITY is provided, CPUQuota is left unbound by systemd + // always setting a property value ensures we can apply a quota and remove it later + cpuQuotaPerSecUSec := uint64(math.MaxUint64) + if quota > 0 { + // systemd converts CPUQuotaPerSecUSec (microseconds per CPU second) to CPUQuota + // (integer percentage of CPU) internally. This means that if a fractional percent of + // CPU is indicated by Resources.CpuQuota, we need to round up to the nearest + // 10ms (1% of a second) such that child cgroups can set the cpu.cfs_quota_us they expect. + cpuQuotaPerSecUSec = uint64(quota*1000000) / period + if cpuQuotaPerSecUSec%10000 != 0 { + cpuQuotaPerSecUSec = ((cpuQuotaPerSecUSec / 10000) + 1) * 10000 + } + } + properties = append(properties, + newSystemdProperty("CPUQuotaPerSecUSec", cpuQuotaPerSecUSec)) + } + + // If we can delegate, we add the property back in + if canDelegate { + properties = append(properties, newSystemdProperty("Delegate", true)) + } + + if resources.Pids != nil && resources.Pids.Max > 0 { + properties = append(properties, + newSystemdProperty("TasksAccounting", true), + newSystemdProperty("TasksMax", uint64(resources.Pids.Max))) + } + + statusChan := make(chan string, 1) + if _, err := conn.StartTransientUnit(group, "replace", properties, statusChan); err == nil { + select { + case <-statusChan: + case <-time.After(time.Second): + logrus.Warnf("Timed out while waiting for StartTransientUnit(%s) completion signal from dbus. Continuing...", group) + } + } else if !isUnitExists(err) { + return &Manager{}, err + } + + return &Manager{ + path: path, + }, nil +} + +func LoadSystemd(slice, group string) (*Manager, error) { + if slice == "" { + slice = defaultSlice + } + group = filepath.Join(defaultCgroup2Path, slice, group) + return &Manager{ + path: group, + }, nil +} + +func (c *Manager) DeleteSystemd() error { + conn, err := systemdDbus.New() + if err != nil { + return err + } + defer conn.Close() + group := systemdUnitFromPath(c.path) + ch := make(chan string) + _, err = conn.StopUnit(group, "replace", ch) + if err != nil { + return err + } + <-ch + return nil +} + +func newSystemdProperty(name string, units interface{}) systemdDbus.Property { + return systemdDbus.Property{ + Name: name, + Value: dbus.MakeVariant(units), + } +} diff --git a/v2/memoryv2_test.go b/v2/memoryv2_test.go index 56ba8454..58703a95 100644 --- a/v2/memoryv2_test.go +++ b/v2/memoryv2_test.go @@ -50,3 +50,18 @@ func TestCgroupv2MemoryStats(t *testing.T) { checkFileContent(t, c.path, "memory.swap.max", "314572800") checkFileContent(t, c.path, "memory.max", "629145600") } + +func TestSystemdCgroupMemoryController(t *testing.T) { + checkCgroupMode(t) + group := fmt.Sprintf("testing-memory-%d.scope", os.Getpid()) + res := Resources{ + Memory: &Memory{ + Max: pointerInt64(629145600), + }, + } + c, err := NewSystemd("", group, os.Getpid(), &res) + if err != nil { + t.Fatal("failed to init new cgroup systemd manager: ", err) + } + checkFileContent(t, c.path, "memory.max", "629145600") +} diff --git a/v2/testutils_test.go b/v2/testutils_test.go index 60147682..ae26b45c 100644 --- a/v2/testutils_test.go +++ b/v2/testutils_test.go @@ -28,8 +28,6 @@ import ( "golang.org/x/sys/unix" ) -const defaultCgroup2Path = "/sys/fs/cgroup" - func checkCgroupMode(t *testing.T) { var st syscall.Statfs_t if err := syscall.Statfs(defaultCgroup2Path, &st); err != nil { diff --git a/v2/utils.go b/v2/utils.go index a673211d..0594d6b2 100644 --- a/v2/utils.go +++ b/v2/utils.go @@ -28,6 +28,8 @@ import ( "strings" "time" + "github.com/godbus/dbus" + "github.com/containerd/cgroups/v2/stats" "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" @@ -238,7 +240,6 @@ func ToResources(spec *specs.LinuxResources) *Resources { func getStatFileContentUint64(filePath string) uint64 { contents, err := ioutil.ReadFile(filePath) if err != nil { - logrus.Error(err) return 0 } trimmed := strings.TrimSpace(string(contents)) @@ -361,3 +362,18 @@ func toRdmaEntry(strEntries []string) []*stats.RdmaEntry { } return rdmaEntries } + +// isUnitExists returns true if the error is that a systemd unit already exists. +func isUnitExists(err error) bool { + if err != nil { + if dbusError, ok := err.(dbus.Error); ok { + return strings.Contains(dbusError.Name, "org.freedesktop.systemd1.UnitExists") + } + } + return false +} + +func systemdUnitFromPath(path string) string { + _, unit := filepath.Split(path) + return unit +}