diff --git a/collector/filesystem_linux.go b/collector/filesystem_linux.go index 2d4e2c2b71..434cf0585d 100644 --- a/collector/filesystem_linux.go +++ b/collector/filesystem_linux.go @@ -39,17 +39,28 @@ const ( var mountTimeout = kingpin.Flag("collector.filesystem.mount-timeout", "how long to wait for a mount to respond before marking it as stale"). Hidden().Default("5s").Duration() +var statWorkerCount = kingpin.Flag("collector.filesystem.stat-workers", + "how many stat calls to process simultaneously. Providing 0 will attempt to process all stat calls simultaneously"). + Hidden().Default("4").Int() var stuckMounts = make(map[string]struct{}) var stuckMountsMtx = &sync.Mutex{} // GetStats returns filesystem stats. func (c *filesystemCollector) GetStats() ([]filesystemStats, error) { - mps, err := mountPointDetails(c.logger) + mpsAll, err := mountPointDetails(c.logger) if err != nil { return nil, err } stats := []filesystemStats{} - for _, labels := range mps { + + labelChan := make(chan filesystemLabels) + statChan := make(chan filesystemStats) + wg := sync.WaitGroup{} + + mps := []filesystemLabels{} + + // Remove all ignored mount points. + for _, labels := range mpsAll { if c.excludedMountPointsPattern.MatchString(labels.mountPoint) { level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", labels.mountPoint) continue @@ -58,84 +69,131 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) { level.Debug(c.logger).Log("msg", "Ignoring fs", "type", labels.fsType) continue } - stuckMountsMtx.Lock() - if _, ok := stuckMounts[labels.mountPoint]; ok { - stats = append(stats, filesystemStats{ - labels: labels, - deviceError: 1, - }) - level.Debug(c.logger).Log("msg", "Mount point is in an unresponsive state", "mountpoint", labels.mountPoint) + mps = append(mps, labels) + } + + workerCount := *statWorkerCount + if workerCount == 0 { + workerCount = len(mps) + } else if workerCount < 0 { + workerCount = 1 + } + + // Effectively creates a threadpool to distribute stat calls amongst. Allows multiple stat calls to be run in parallel. + for i := 0; i < workerCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for labels := range labelChan { + statChan <- c.processStat(labels) + } + }() + } + + go func() { + for _, labels := range mps { + stuckMountsMtx.Lock() + if _, ok := stuckMounts[labels.mountPoint]; ok { + statChan <- filesystemStats{ + labels: labels, + deviceError: 1, + } + level.Debug(c.logger).Log("msg", "Mount point is in an unresponsive state", "mountpoint", labels.mountPoint) + stuckMountsMtx.Unlock() + continue + } stuckMountsMtx.Unlock() - continue + labelChan <- labels } - stuckMountsMtx.Unlock() + close(labelChan) + wg.Wait() + close(statChan) + }() - // The success channel is used do tell the "watcher" that the stat - // finished successfully. The channel is closed on success. - success := make(chan struct{}) - go stuckMountWatcher(labels.mountPoint, success, c.logger) + for stat := range statChan { + stats = append(stats, stat) + } + return stats, nil +} - buf := new(unix.Statfs_t) - err = unix.Statfs(rootfsFilePath(labels.mountPoint), buf) - stuckMountsMtx.Lock() - close(success) - // If the mount has been marked as stuck, unmark it and log it's recovery. - if _, ok := stuckMounts[labels.mountPoint]; ok { - level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint) - delete(stuckMounts, labels.mountPoint) - } - stuckMountsMtx.Unlock() +func (c *filesystemCollector) processStat(labels filesystemLabels) filesystemStats { + // The done channel is used do tell the "watcher" that the stat call completed. + done := make(chan error) - if err != nil { - stats = append(stats, filesystemStats{ - labels: labels, - deviceError: 1, - }) + // Run Statfs call which may hang + buf := new(unix.Statfs_t) + go runStatfs(buf, labels, done, c.logger) - level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err) - continue + // Waits for Statfs call to complete or timeout. If Statfs call does not timeout + // return whatever Statfs returns. Otherwise, return error if call times out. + err := waitStatfs(labels.mountPoint, done, c.logger) + close(done) + + if err != nil { + level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err) + return filesystemStats{ + labels: labels, + deviceError: 1, } + } - var ro float64 - for _, option := range strings.Split(labels.options, ",") { - if option == "ro" { - ro = 1 - break - } + var ro float64 + for _, option := range strings.Split(labels.options, ",") { + if option == "ro" { + ro = 1 + break } + } - stats = append(stats, filesystemStats{ - labels: labels, - size: float64(buf.Blocks) * float64(buf.Bsize), - free: float64(buf.Bfree) * float64(buf.Bsize), - avail: float64(buf.Bavail) * float64(buf.Bsize), - files: float64(buf.Files), - filesFree: float64(buf.Ffree), - ro: ro, - }) + return filesystemStats{ + labels: labels, + size: float64(buf.Blocks) * float64(buf.Bsize), + free: float64(buf.Bfree) * float64(buf.Bsize), + avail: float64(buf.Bavail) * float64(buf.Bsize), + files: float64(buf.Files), + filesFree: float64(buf.Ffree), + ro: ro, } - return stats, nil } -// stuckMountWatcher listens on the given success channel and if the channel closes -// then the watcher does nothing. If instead the timeout is reached, the -// mount point that is being watched is marked as stuck. -func stuckMountWatcher(mountPoint string, success chan struct{}, logger log.Logger) { +// runStatfs runs the unix.Statfs command which may hang. On completion of the Statfs call +// stuckMounts is used to determine whether the call resulted in a timeout, if so unset the +// stuckMounts flag. +func runStatfs(buf *unix.Statfs_t, labels filesystemLabels, done chan error, logger log.Logger) { + err := unix.Statfs(rootfsFilePath(labels.mountPoint), buf) + stuckMountsMtx.Lock() + // If the mount has been marked as stuck, unmark it and log it's recovery. + if _, ok := stuckMounts[labels.mountPoint]; ok { + level.Debug(logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint) + delete(stuckMounts, labels.mountPoint) + } else { + done <- err + } + stuckMountsMtx.Unlock() +} + +// waitStatfs listens on the given done channel and returns whatever error is +// returned by the Statfs call. If instead the timeout is reached, the mount point +// is marked as stuck and returns with error. +func waitStatfs(mountPoint string, done chan error, logger log.Logger) error { + var err error select { - case <-success: - // Success + case err = <-done: + // Success. case <-time.After(*mountTimeout): - // Timed out, mark mount as stuck + // Timed out, mark mount as stuck. stuckMountsMtx.Lock() select { - case <-success: - // Success came in just after the timeout was reached, don't label the mount as stuck + case err = <-done: + // Success came in just after the timeout was reached, don't label the mount as stuck. default: + err = errors.New("Statfs timed out") level.Debug(logger).Log("msg", "Mount point timed out, it is being labeled as stuck and will not be monitored", "mountpoint", mountPoint) stuckMounts[mountPoint] = struct{}{} } stuckMountsMtx.Unlock() } + return err } func mountPointDetails(logger log.Logger) ([]filesystemLabels, error) {