Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 88 additions & 56 deletions collector/filesystem_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
var mountTimeout = kingpin.Flag("collector.filesystem.mount-timeout",
"how long to wait for a mount to respond before marking it as stale").
Hidden().Default("5s").Duration()
var statWorkerCount = kingpin.Flag("collector.filesystem.stat-workers",
"how many stat calls to process simultaneously").
Hidden().Default("4").Int()
var stuckMounts = make(map[string]struct{})
var stuckMountsMtx = &sync.Mutex{}

Expand All @@ -50,72 +53,101 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
return nil, err
}
stats := []filesystemStats{}
for _, labels := range mps {
if c.excludedMountPointsPattern.MatchString(labels.mountPoint) {
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", labels.mountPoint)
continue
}
if c.excludedFSTypesPattern.MatchString(labels.fsType) {
level.Debug(c.logger).Log("msg", "Ignoring fs", "type", labels.fsType)
continue
}
stuckMountsMtx.Lock()
if _, ok := stuckMounts[labels.mountPoint]; ok {
stats = append(stats, filesystemStats{
labels: labels,
deviceError: 1,
})
level.Debug(c.logger).Log("msg", "Mount point is in an unresponsive state", "mountpoint", labels.mountPoint)
labelChan := make(chan filesystemLabels)
statChan := make(chan filesystemStats)
wg := sync.WaitGroup{}

workerCount := *statWorkerCount
if workerCount < 1 {
workerCount = 1
}

for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for labels := range labelChan {
statChan <- c.processStat(labels)
}
}()
}

go func() {
for _, labels := range mps {
if c.excludedMountPointsPattern.MatchString(labels.mountPoint) {
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", labels.mountPoint)
continue
}
if c.excludedFSTypesPattern.MatchString(labels.fsType) {
level.Debug(c.logger).Log("msg", "Ignoring fs", "type", labels.fsType)
continue
}

stuckMountsMtx.Lock()
if _, ok := stuckMounts[labels.mountPoint]; ok {
stats = append(stats, filesystemStats{
labels: labels,
deviceError: 1,
})
level.Debug(c.logger).Log("msg", "Mount point is in an unresponsive state", "mountpoint", labels.mountPoint)
stuckMountsMtx.Unlock()
continue
}

stuckMountsMtx.Unlock()
continue
labelChan <- labels
}
stuckMountsMtx.Unlock()
close(labelChan)
wg.Wait()
close(statChan)
}()

// The success channel is used do tell the "watcher" that the stat
// finished successfully. The channel is closed on success.
success := make(chan struct{})
go stuckMountWatcher(labels.mountPoint, success, c.logger)
for stat := range statChan {
stats = append(stats, stat)
}
return stats, nil
}

buf := new(unix.Statfs_t)
err = unix.Statfs(rootfsFilePath(labels.mountPoint), buf)
stuckMountsMtx.Lock()
close(success)
// If the mount has been marked as stuck, unmark it and log it's recovery.
if _, ok := stuckMounts[labels.mountPoint]; ok {
level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint)
delete(stuckMounts, labels.mountPoint)
}
stuckMountsMtx.Unlock()
func (c *filesystemCollector) processStat(labels filesystemLabels) filesystemStats {
success := make(chan struct{})
go stuckMountWatcher(labels.mountPoint, success, c.logger)

if err != nil {
stats = append(stats, filesystemStats{
labels: labels,
deviceError: 1,
})
buf := new(unix.Statfs_t)
err := unix.Statfs(rootfsFilePath(labels.mountPoint), buf)
stuckMountsMtx.Lock()
close(success)

level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err)
continue
}
// If the mount has been marked as stuck, unmark it and log it's recovery.
if _, ok := stuckMounts[labels.mountPoint]; ok {
level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint)
delete(stuckMounts, labels.mountPoint)
}
stuckMountsMtx.Unlock()

var ro float64
for _, option := range strings.Split(labels.options, ",") {
if option == "ro" {
ro = 1
break
}
if err != nil {
level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err)
return filesystemStats{
labels: labels,
deviceError: 1,
}
}

stats = append(stats, filesystemStats{
labels: labels,
size: float64(buf.Blocks) * float64(buf.Bsize),
free: float64(buf.Bfree) * float64(buf.Bsize),
avail: float64(buf.Bavail) * float64(buf.Bsize),
files: float64(buf.Files),
filesFree: float64(buf.Ffree),
ro: ro,
})
var ro float64
for _, option := range strings.Split(labels.options, ",") {
if option == "ro" {
ro = 1
break
}
}
return filesystemStats{
labels: labels,
size: float64(buf.Blocks) * float64(buf.Bsize),
free: float64(buf.Bfree) * float64(buf.Bsize),
avail: float64(buf.Bavail) * float64(buf.Bsize),
files: float64(buf.Files),
filesFree: float64(buf.Ffree),
ro: ro,
}
return stats, nil
}

// stuckMountWatcher listens on the given success channel and if the channel closes
Expand Down