Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 115 additions & 57 deletions collector/filesystem_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,28 @@ const (
var mountTimeout = kingpin.Flag("collector.filesystem.mount-timeout",
"how long to wait for a mount to respond before marking it as stale").
Hidden().Default("5s").Duration()
var statWorkerCount = kingpin.Flag("collector.filesystem.stat-workers",
"how many stat calls to process simultaneously. Providing 0 will attempt to process all stat calls simultaneously").
Hidden().Default("4").Int()
var stuckMounts = make(map[string]struct{})
var stuckMountsMtx = &sync.Mutex{}

// GetStats returns filesystem stats.
func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
mps, err := mountPointDetails(c.logger)
mpsAll, err := mountPointDetails(c.logger)
if err != nil {
return nil, err
}
stats := []filesystemStats{}
for _, labels := range mps {

labelChan := make(chan filesystemLabels)
statChan := make(chan filesystemStats)
wg := sync.WaitGroup{}

mps := []filesystemLabels{}

// Remove all ignored mount points.
for _, labels := range mpsAll {
if c.excludedMountPointsPattern.MatchString(labels.mountPoint) {
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", labels.mountPoint)
continue
Expand All @@ -58,84 +69,131 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
level.Debug(c.logger).Log("msg", "Ignoring fs", "type", labels.fsType)
continue
}
stuckMountsMtx.Lock()
if _, ok := stuckMounts[labels.mountPoint]; ok {
stats = append(stats, filesystemStats{
labels: labels,
deviceError: 1,
})
level.Debug(c.logger).Log("msg", "Mount point is in an unresponsive state", "mountpoint", labels.mountPoint)
mps = append(mps, labels)
}

workerCount := *statWorkerCount
if workerCount == 0 {
workerCount = len(mps)
} else if workerCount < 0 {
workerCount = 1
}

// Effectively creates a threadpool to distribute stat calls amongst. Allows multiple stat calls to be run in parallel.
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for labels := range labelChan {
statChan <- c.processStat(labels)
}
}()
}

go func() {
for _, labels := range mps {
stuckMountsMtx.Lock()
if _, ok := stuckMounts[labels.mountPoint]; ok {
statChan <- filesystemStats{
labels: labels,
deviceError: 1,
}
level.Debug(c.logger).Log("msg", "Mount point is in an unresponsive state", "mountpoint", labels.mountPoint)
stuckMountsMtx.Unlock()
continue
}
stuckMountsMtx.Unlock()
continue
labelChan <- labels
}
stuckMountsMtx.Unlock()
close(labelChan)
wg.Wait()
close(statChan)
}()

// The success channel is used do tell the "watcher" that the stat
// finished successfully. The channel is closed on success.
success := make(chan struct{})
go stuckMountWatcher(labels.mountPoint, success, c.logger)
for stat := range statChan {
stats = append(stats, stat)
}
return stats, nil
}

buf := new(unix.Statfs_t)
err = unix.Statfs(rootfsFilePath(labels.mountPoint), buf)
stuckMountsMtx.Lock()
close(success)
// If the mount has been marked as stuck, unmark it and log it's recovery.
if _, ok := stuckMounts[labels.mountPoint]; ok {
level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint)
delete(stuckMounts, labels.mountPoint)
}
stuckMountsMtx.Unlock()
func (c *filesystemCollector) processStat(labels filesystemLabels) filesystemStats {
// The done channel is used do tell the "watcher" that the stat call completed.
done := make(chan error)

if err != nil {
stats = append(stats, filesystemStats{
labels: labels,
deviceError: 1,
})
// Run Statfs call which may hang
buf := new(unix.Statfs_t)
go runStatfs(buf, labels, done, c.logger)

level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err)
continue
// Waits for Statfs call to complete or timeout. If Statfs call does not timeout
// return whatever Statfs returns. Otherwise, return error if call times out.
err := waitStatfs(labels.mountPoint, done, c.logger)
close(done)

if err != nil {
level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err)
return filesystemStats{
labels: labels,
deviceError: 1,
}
}

var ro float64
for _, option := range strings.Split(labels.options, ",") {
if option == "ro" {
ro = 1
break
}
var ro float64
for _, option := range strings.Split(labels.options, ",") {
if option == "ro" {
ro = 1
break
}
}

stats = append(stats, filesystemStats{
labels: labels,
size: float64(buf.Blocks) * float64(buf.Bsize),
free: float64(buf.Bfree) * float64(buf.Bsize),
avail: float64(buf.Bavail) * float64(buf.Bsize),
files: float64(buf.Files),
filesFree: float64(buf.Ffree),
ro: ro,
})
return filesystemStats{
labels: labels,
size: float64(buf.Blocks) * float64(buf.Bsize),
free: float64(buf.Bfree) * float64(buf.Bsize),
avail: float64(buf.Bavail) * float64(buf.Bsize),
files: float64(buf.Files),
filesFree: float64(buf.Ffree),
ro: ro,
}
return stats, nil
}

// stuckMountWatcher listens on the given success channel and if the channel closes
// then the watcher does nothing. If instead the timeout is reached, the
// mount point that is being watched is marked as stuck.
func stuckMountWatcher(mountPoint string, success chan struct{}, logger log.Logger) {
// runStatfs runs the unix.Statfs command which may hang. On completion of the Statfs call
// stuckMounts is used to determine whether the call resulted in a timeout, if so unset the
// stuckMounts flag.
func runStatfs(buf *unix.Statfs_t, labels filesystemLabels, done chan error, logger log.Logger) {
err := unix.Statfs(rootfsFilePath(labels.mountPoint), buf)
stuckMountsMtx.Lock()
// If the mount has been marked as stuck, unmark it and log it's recovery.
if _, ok := stuckMounts[labels.mountPoint]; ok {
level.Debug(logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint)
delete(stuckMounts, labels.mountPoint)
} else {
done <- err
}
stuckMountsMtx.Unlock()
}

// waitStatfs listens on the given done channel and returns whatever error is
// returned by the Statfs call. If instead the timeout is reached, the mount point
// is marked as stuck and returns with error.
func waitStatfs(mountPoint string, done chan error, logger log.Logger) error {
var err error
select {
case <-success:
// Success
case err = <-done:
// Success.
case <-time.After(*mountTimeout):
// Timed out, mark mount as stuck
// Timed out, mark mount as stuck.
stuckMountsMtx.Lock()
select {
case <-success:
// Success came in just after the timeout was reached, don't label the mount as stuck
case err = <-done:
// Success came in just after the timeout was reached, don't label the mount as stuck.
default:
err = errors.New("Statfs timed out")
level.Debug(logger).Log("msg", "Mount point timed out, it is being labeled as stuck and will not be monitored", "mountpoint", mountPoint)
stuckMounts[mountPoint] = struct{}{}
}
stuckMountsMtx.Unlock()
}
return err
}

func mountPointDetails(logger log.Logger) ([]filesystemLabels, error) {
Expand Down