Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/collector/dist/utils/containers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# (C) Datadog, Inc. 2010-2016
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)

def freeze(o):
"""
Freezes any mutable object including dictionaries and lists for hashing.
Accepts nested dictionaries.
"""
if isinstance(o, dict):
return frozenset(dict([(k, freeze(v)) for k,v in o.iteritems()]).iteritems())

if isinstance(o, list):
return tuple([freeze(v) for v in o])

return o

def hash_mutable(m):
return hash(freeze(m))
106 changes: 106 additions & 0 deletions pkg/collector/dist/utils/tailfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# (C) Datadog, Inc. 2010-2016
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)

import binascii
import os
from stat import ST_INO, ST_SIZE


class TailFile(object):

CRC_SIZE = 16

def __init__(self, logger, path, callback):
self._path = path
self._f = None
self._inode = None
self._size = 0
self._crc = None
self._log = logger
self._callback = callback

def _open_file(self, move_end=False, pos=False):

already_open = False
# close and reopen to handle logrotate
if self._f is not None:
self._f.close()
self._f = None
already_open = True

stat = os.stat(self._path)
inode = stat[ST_INO]
size = stat[ST_SIZE]

# Compute CRC of the beginning of the file
crc = None
if size >= self.CRC_SIZE:
tmp_file = open(self._path, 'r')
data = tmp_file.read(self.CRC_SIZE)
crc = binascii.crc32(data)

if already_open:
# Check if file has been removed
if self._inode is not None and inode != self._inode:
self._log.debug("File removed, reopening")
move_end = False
pos = False

# Check if file has been truncated
elif self._size > 0 and size < self._size:
self._log.debug("File truncated, reopening")
move_end = False
pos = False

# Check if file has been truncated and too much data has
# alrady been written (copytruncate and opened files...)
if size >= self.CRC_SIZE and self._crc is not None and crc != self._crc:
self._log.debug("Begining of file modified, reopening")
move_end = False
pos = False

self._inode = inode
self._size = size
self._crc = crc

self._f = open(self._path, 'r')
if move_end:
self._log.debug("Opening file %s" % (self._path))
self._f.seek(0, os.SEEK_END)
elif pos:
self._log.debug("Reopening file %s at %s" % (self._path, pos))
self._f.seek(pos)

return True

def tail(self, line_by_line=True, move_end=True):
"""Read line-by-line and run callback on each line.
line_by_line: yield each time a callback has returned True
move_end: start from the last line of the log"""
try:
self._open_file(move_end=move_end)

while True:
pos = self._f.tell()
line = self._f.readline()
if line:
line = line.strip(chr(0)) # a truncate may have create holes in the file
if self._callback(line.rstrip("\n")):
if line_by_line:
yield True
pos = self._f.tell()
self._open_file(move_end=False, pos=pos)
else:
continue
else:
continue
else:
yield True
assert pos == self._f.tell()
self._open_file(move_end=False, pos=pos)

except Exception as e:
# log but survive
self._log.exception(e)
raise StopIteration(e)
74 changes: 74 additions & 0 deletions pkg/collector/dist/utils/timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# (C) Datadog, Inc. 2010-2016
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)

from threading import Thread
import functools

_thread_by_func = {}


class TimeoutException(Exception):
"""
Raised when a function runtime exceeds the limit set.
"""
pass


class ThreadMethod(Thread):
"""
Descendant of `Thread` class.

Run the specified target method with the specified arguments.
Store result and exceptions.

From: https://code.activestate.com/recipes/440569/
"""
def __init__(self, target, args, kwargs):
Thread.__init__(self)
self.setDaemon(True)
self.target, self.args, self.kwargs = target, args, kwargs
self.start()

def run(self):
try:
self.result = self.target(*self.args, **self.kwargs)
except Exception as e:
self.exception = e
else:
self.exception = None


def timeout(timeout):
"""
A decorator to timeout a function. Decorated method calls are executed in a separate new thread
with a specified timeout.
Also check if a thread for the same function already exists before creating a new one.

Note: Compatible with Windows (thread based).
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = "{0}:{1}:{2}:{3}".format(id(func), func.__name__, args, kwargs)

if key in _thread_by_func:
# A thread for the same function already exists.
worker = _thread_by_func[key]
else:
worker = ThreadMethod(func, args, kwargs)
_thread_by_func[key] = worker

worker.join(timeout)
if worker.is_alive():
raise TimeoutException()

del _thread_by_func[key]

if worker.exception:
raise worker.exception
else:
return worker.result

return wrapper
return decorator
29 changes: 29 additions & 0 deletions pkg/collector/dist/utils/timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# (C) Datadog, Inc. 2010-2017
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)

# stdlib
import time

class Timer(object):
""" Helper class """

def __init__(self):
self.start()

def _now(self):
return time.time()

def start(self):
self.started = self._now()
self.last = self.started
return self

def step(self):
now = self._now()
step = now - self.last
self.last = now
return step

def total(self, as_sec=True):
return self._now() - self.started