diff --git a/pkg/collector/dist/utils/containers.py b/pkg/collector/dist/utils/containers.py new file mode 100644 index 000000000000..199d1ea5d61c --- /dev/null +++ b/pkg/collector/dist/utils/containers.py @@ -0,0 +1,19 @@ +# (C) Datadog, Inc. 2010-2016 +# All rights reserved +# Licensed under Simplified BSD License (see LICENSE) + +def freeze(o): + """ + Freezes any mutable object including dictionaries and lists for hashing. + Accepts nested dictionaries. + """ + if isinstance(o, dict): + return frozenset(dict([(k, freeze(v)) for k,v in o.iteritems()]).iteritems()) + + if isinstance(o, list): + return tuple([freeze(v) for v in o]) + + return o + +def hash_mutable(m): + return hash(freeze(m)) diff --git a/pkg/collector/dist/utils/tailfile.py b/pkg/collector/dist/utils/tailfile.py new file mode 100644 index 000000000000..ed9abdd9b9c6 --- /dev/null +++ b/pkg/collector/dist/utils/tailfile.py @@ -0,0 +1,106 @@ +# (C) Datadog, Inc. 2010-2016 +# All rights reserved +# Licensed under Simplified BSD License (see LICENSE) + +import binascii +import os +from stat import ST_INO, ST_SIZE + + +class TailFile(object): + + CRC_SIZE = 16 + + def __init__(self, logger, path, callback): + self._path = path + self._f = None + self._inode = None + self._size = 0 + self._crc = None + self._log = logger + self._callback = callback + + def _open_file(self, move_end=False, pos=False): + + already_open = False + # close and reopen to handle logrotate + if self._f is not None: + self._f.close() + self._f = None + already_open = True + + stat = os.stat(self._path) + inode = stat[ST_INO] + size = stat[ST_SIZE] + + # Compute CRC of the beginning of the file + crc = None + if size >= self.CRC_SIZE: + tmp_file = open(self._path, 'r') + data = tmp_file.read(self.CRC_SIZE) + crc = binascii.crc32(data) + + if already_open: + # Check if file has been removed + if self._inode is not None and inode != self._inode: + self._log.debug("File removed, reopening") + move_end = False + pos = False + + # Check if file has been truncated + elif self._size > 0 and size < self._size: + self._log.debug("File truncated, reopening") + move_end = False + pos = False + + # Check if file has been truncated and too much data has + # alrady been written (copytruncate and opened files...) + if size >= self.CRC_SIZE and self._crc is not None and crc != self._crc: + self._log.debug("Begining of file modified, reopening") + move_end = False + pos = False + + self._inode = inode + self._size = size + self._crc = crc + + self._f = open(self._path, 'r') + if move_end: + self._log.debug("Opening file %s" % (self._path)) + self._f.seek(0, os.SEEK_END) + elif pos: + self._log.debug("Reopening file %s at %s" % (self._path, pos)) + self._f.seek(pos) + + return True + + def tail(self, line_by_line=True, move_end=True): + """Read line-by-line and run callback on each line. + line_by_line: yield each time a callback has returned True + move_end: start from the last line of the log""" + try: + self._open_file(move_end=move_end) + + while True: + pos = self._f.tell() + line = self._f.readline() + if line: + line = line.strip(chr(0)) # a truncate may have create holes in the file + if self._callback(line.rstrip("\n")): + if line_by_line: + yield True + pos = self._f.tell() + self._open_file(move_end=False, pos=pos) + else: + continue + else: + continue + else: + yield True + assert pos == self._f.tell() + self._open_file(move_end=False, pos=pos) + + except Exception as e: + # log but survive + self._log.exception(e) + raise StopIteration(e) diff --git a/pkg/collector/dist/utils/timeout.py b/pkg/collector/dist/utils/timeout.py new file mode 100644 index 000000000000..f70386457425 --- /dev/null +++ b/pkg/collector/dist/utils/timeout.py @@ -0,0 +1,74 @@ +# (C) Datadog, Inc. 2010-2016 +# All rights reserved +# Licensed under Simplified BSD License (see LICENSE) + +from threading import Thread +import functools + +_thread_by_func = {} + + +class TimeoutException(Exception): + """ + Raised when a function runtime exceeds the limit set. + """ + pass + + +class ThreadMethod(Thread): + """ + Descendant of `Thread` class. + + Run the specified target method with the specified arguments. + Store result and exceptions. + + From: https://code.activestate.com/recipes/440569/ + """ + def __init__(self, target, args, kwargs): + Thread.__init__(self) + self.setDaemon(True) + self.target, self.args, self.kwargs = target, args, kwargs + self.start() + + def run(self): + try: + self.result = self.target(*self.args, **self.kwargs) + except Exception as e: + self.exception = e + else: + self.exception = None + + +def timeout(timeout): + """ + A decorator to timeout a function. Decorated method calls are executed in a separate new thread + with a specified timeout. + Also check if a thread for the same function already exists before creating a new one. + + Note: Compatible with Windows (thread based). + """ + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + key = "{0}:{1}:{2}:{3}".format(id(func), func.__name__, args, kwargs) + + if key in _thread_by_func: + # A thread for the same function already exists. + worker = _thread_by_func[key] + else: + worker = ThreadMethod(func, args, kwargs) + _thread_by_func[key] = worker + + worker.join(timeout) + if worker.is_alive(): + raise TimeoutException() + + del _thread_by_func[key] + + if worker.exception: + raise worker.exception + else: + return worker.result + + return wrapper + return decorator diff --git a/pkg/collector/dist/utils/timer.py b/pkg/collector/dist/utils/timer.py new file mode 100644 index 000000000000..fb15ad8b0bbb --- /dev/null +++ b/pkg/collector/dist/utils/timer.py @@ -0,0 +1,29 @@ +# (C) Datadog, Inc. 2010-2017 +# All rights reserved +# Licensed under Simplified BSD License (see LICENSE) + +# stdlib +import time + +class Timer(object): + """ Helper class """ + + def __init__(self): + self.start() + + def _now(self): + return time.time() + + def start(self): + self.started = self._now() + self.last = self.started + return self + + def step(self): + now = self._now() + step = now - self.last + self.last = now + return step + + def total(self, as_sec=True): + return self._now() - self.started