Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 55 additions & 41 deletions libcontainer/cgroups/systemd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,8 @@ import (
)

var (
connOnce sync.Once
connDbus *systemdDbus.Conn
connErr error

versionOnce sync.Once
version int
versionErr error

isRunningSystemdOnce sync.Once
isRunningSystemd bool
Expand Down Expand Up @@ -284,19 +279,6 @@ func generateDeviceProperties(rules []*configs.DeviceRule) ([]systemdDbus.Proper
return properties, nil
}

// getDbusConnection lazy initializes systemd dbus connection
// and returns it
func getDbusConnection(rootless bool) (*systemdDbus.Conn, error) {
connOnce.Do(func() {
if rootless {
connDbus, connErr = NewUserSystemdDbus()
} else {
connDbus, connErr = systemdDbus.New()
}
})
return connDbus, connErr
}

func newProp(name string, units interface{}) systemdDbus.Property {
return systemdDbus.Property{
Name: name,
Expand All @@ -312,19 +294,29 @@ func getUnitName(c *configs.Cgroup) string {
return c.Name
}

// isUnitExists returns true if the error is that a systemd unit already exists.
func isUnitExists(err error) bool {
// isDbusError returns true if the error is a specific dbus error.
func isDbusError(err error, name string) bool {
if err != nil {
if dbusError, ok := err.(dbus.Error); ok {
return strings.Contains(dbusError.Name, "org.freedesktop.systemd1.UnitExists")
var derr *dbus.Error
if errors.As(err, &derr) {
return strings.Contains(derr.Name, name)
}
}
return false
}

func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []systemdDbus.Property) error {
// isUnitExists returns true if the error is that a systemd unit already exists.
func isUnitExists(err error) bool {
return isDbusError(err, "org.freedesktop.systemd1.UnitExists")
}

func startUnit(cm *dbusConnManager, unitName string, properties []systemdDbus.Property) error {
statusChan := make(chan string, 1)
if _, err := dbusConnection.StartTransientUnit(unitName, "replace", properties, statusChan); err == nil {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
_, err := c.StartTransientUnit(unitName, "replace", properties, statusChan)
return err
})
if err == nil {
timeout := time.NewTimer(30 * time.Second)
defer timeout.Stop()

Expand All @@ -333,11 +325,11 @@ func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []s
close(statusChan)
// Please refer to https://godoc.org/github.com/coreos/go-systemd/dbus#Conn.StartUnit
if s != "done" {
dbusConnection.ResetFailedUnit(unitName)
resetFailedUnit(cm, unitName)
return errors.Errorf("error creating systemd unit `%s`: got `%s`", unitName, s)
}
case <-timeout.C:
dbusConnection.ResetFailedUnit(unitName)
resetFailedUnit(cm, unitName)
return errors.New("Timeout waiting for systemd to create " + unitName)
}
} else if !isUnitExists(err) {
Expand All @@ -347,9 +339,13 @@ func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []s
return nil
}

func stopUnit(dbusConnection *systemdDbus.Conn, unitName string) error {
func stopUnit(cm *dbusConnManager, unitName string) error {
statusChan := make(chan string, 1)
if _, err := dbusConnection.StopUnit(unitName, "replace", statusChan); err == nil {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
_, err := c.StopUnit(unitName, "replace", statusChan)
return err
})
if err == nil {
select {
case s := <-statusChan:
close(statusChan)
Expand All @@ -364,20 +360,40 @@ func stopUnit(dbusConnection *systemdDbus.Conn, unitName string) error {
return nil
}

func systemdVersion(conn *systemdDbus.Conn) (int, error) {
func resetFailedUnit(cm *dbusConnManager, name string) {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
return c.ResetFailedUnit(name)
})
if err != nil {
logrus.Warnf("unable to reset failed unit: %v", err)
}
}

func setUnitProperties(cm *dbusConnManager, name string, properties ...systemdDbus.Property) error {
return cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
return c.SetUnitProperties(name, true, properties...)
})
}

func systemdVersion(cm *dbusConnManager) int {
versionOnce.Do(func() {
version = -1
verStr, err := conn.GetManagerProperty("Version")
if err != nil {
versionErr = err
return
var verStr string
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
var err error
verStr, err = c.GetManagerProperty("Version")
return err
})
if err == nil {
version, err = systemdVersionAtoi(verStr)
}

version, versionErr = systemdVersionAtoi(verStr)
return
if err != nil {
logrus.WithError(err).Error("unable to get systemd version")
}
})

return version, versionErr
return version
}

func systemdVersionAtoi(verStr string) (int, error) {
Expand All @@ -395,13 +411,11 @@ func systemdVersionAtoi(verStr string) (int, error) {
return ver, errors.Wrapf(err, "can't parse version %s", verStr)
}

func addCpuQuota(conn *systemdDbus.Conn, properties *[]systemdDbus.Property, quota int64, period uint64) {
func addCpuQuota(cm *dbusConnManager, properties *[]systemdDbus.Property, quota int64, period uint64) {
if period != 0 {
// systemd only supports CPUQuotaPeriodUSec since v242
sdVer, err := systemdVersion(conn)
if err != nil {
logrus.Warnf("systemdVersion: %s", err)
} else if sdVer >= 242 {
sdVer := systemdVersion(cm)
if sdVer >= 242 {
*properties = append(*properties,
newProp("CPUQuotaPeriodUSec", period))
}
Expand Down
89 changes: 89 additions & 0 deletions libcontainer/cgroups/systemd/dbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// +build linux

package systemd

import (
"sync"

systemdDbus "github.com/coreos/go-systemd/v22/dbus"
dbus "github.com/godbus/dbus/v5"
)

type dbusConnManager struct {
conn *systemdDbus.Conn
rootless bool
sync.RWMutex
}

// newDbusConnManager initializes systemd dbus connection manager.
func newDbusConnManager(rootless bool) *dbusConnManager {
return &dbusConnManager{
rootless: rootless,
}
}

// getConnection lazily initializes and returns systemd dbus connection.
func (d *dbusConnManager) getConnection() (*systemdDbus.Conn, error) {
// In the case where d.conn != nil
// Use the read lock the first time to ensure
// that Conn can be acquired at the same time.
d.RLock()
if conn := d.conn; conn != nil {
d.RUnlock()
return conn, nil
}
d.RUnlock()

// In the case where d.conn == nil
// Use write lock to ensure that only one
// will be created
d.Lock()
defer d.Unlock()
if conn := d.conn; conn != nil {
return conn, nil
}

conn, err := d.newConnection()
if err != nil {
return nil, err
}
d.conn = conn
return conn, nil
}

func (d *dbusConnManager) newConnection() (*systemdDbus.Conn, error) {
if d.rootless {
return newUserSystemdDbus()
}
return systemdDbus.New()
}

// resetConnection resets the connection to its initial state
// (so it can be reconnected if necessary).
func (d *dbusConnManager) resetConnection(conn *systemdDbus.Conn) {
d.Lock()
defer d.Unlock()
if d.conn != nil && d.conn == conn {
d.conn.Close()
d.conn = nil
}
}

var errDbusConnClosed = dbus.ErrClosed.Error()

// retryOnDisconnect calls op, and if the error it returns is about closed dbus
// connection, the connection is re-established and the op is retried. This helps
// with the situation when dbus is restarted and we have a stale connection.
func (d *dbusConnManager) retryOnDisconnect(op func(*systemdDbus.Conn) error) error {
for {
conn, err := d.getConnection()
if err != nil {
return err
}
err = op(conn)
if !isDbusError(err, errDbusConnClosed) {
return err
}
d.resetConnection(conn)
}
}
4 changes: 2 additions & 2 deletions libcontainer/cgroups/systemd/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/pkg/errors"
)

// NewUserSystemdDbus creates a connection for systemd user-instance.
func NewUserSystemdDbus() (*systemdDbus.Conn, error) {
// newUserSystemdDbus creates a connection for systemd user-instance.
func newUserSystemdDbus() (*systemdDbus.Conn, error) {
addr, err := DetectUserDbusSessionBusAddress()
if err != nil {
return nil, err
Expand Down
29 changes: 9 additions & 20 deletions libcontainer/cgroups/systemd/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ type legacyManager struct {
mu sync.Mutex
cgroups *configs.Cgroup
paths map[string]string
dbus *dbusConnManager
}

func NewLegacyManager(cg *configs.Cgroup, paths map[string]string) cgroups.Manager {
return &legacyManager{
cgroups: cg,
paths: paths,
dbus: newDbusConnManager(false),
}
}

Expand Down Expand Up @@ -57,7 +59,7 @@ var legacySubsystems = []subsystem{
&fs.NameGroup{GroupName: "name=systemd"},
}

func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]systemdDbus.Property, error) {
func genV1ResourcesProperties(c *configs.Cgroup, cm *dbusConnManager) ([]systemdDbus.Property, error) {
var properties []systemdDbus.Property
r := c.Resources

Expand All @@ -77,7 +79,7 @@ func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]syst
newProp("CPUShares", r.CpuShares))
}

addCpuQuota(conn, &properties, r.CpuQuota, r.CpuPeriod)
addCpuQuota(cm, &properties, r.CpuQuota, r.CpuPeriod)

if r.BlkioWeight != 0 {
properties = append(properties,
Expand Down Expand Up @@ -155,11 +157,7 @@ func (m *legacyManager) Apply(pid int) error {
properties = append(properties,
newProp("DefaultDependencies", false))

dbusConnection, err := getDbusConnection(false)
if err != nil {
return err
}
resourcesProperties, err := genV1ResourcesProperties(c, dbusConnection)
resourcesProperties, err := genV1ResourcesProperties(c, m.dbus)
if err != nil {
return err
}
Expand All @@ -174,7 +172,7 @@ func (m *legacyManager) Apply(pid int) error {
}
}

if err := startUnit(dbusConnection, unitName, properties); err != nil {
if err := startUnit(m.dbus, unitName, properties); err != nil {
return err
}

Expand Down Expand Up @@ -212,13 +210,8 @@ func (m *legacyManager) Destroy() error {
m.mu.Lock()
defer m.mu.Unlock()

dbusConnection, err := getDbusConnection(false)
if err != nil {
return err
}
unitName := getUnitName(m.cgroups)
stopErr := stopUnit(m.dbus, getUnitName(m.cgroups))

stopErr := stopUnit(dbusConnection, unitName)
// Both on success and on error, cleanup all the cgroups we are aware of.
// Some of them were created directly by Apply() and are not managed by systemd.
if err := cgroups.RemovePaths(m.paths); err != nil {
Expand Down Expand Up @@ -342,11 +335,7 @@ func (m *legacyManager) Set(container *configs.Config) error {
if m.cgroups.Paths != nil {
return nil
}
dbusConnection, err := getDbusConnection(false)
if err != nil {
return err
}
properties, err := genV1ResourcesProperties(container.Cgroups, dbusConnection)
properties, err := genV1ResourcesProperties(container.Cgroups, m.dbus)
if err != nil {
return err
}
Expand Down Expand Up @@ -374,7 +363,7 @@ func (m *legacyManager) Set(container *configs.Config) error {
}
}

if err := dbusConnection.SetUnitProperties(getUnitName(container.Cgroups), true, properties...); err != nil {
if err := setUnitProperties(m.dbus, getUnitName(container.Cgroups), properties...); err != nil {
_ = m.Freeze(targetFreezerState)
return err
}
Expand Down
Loading