Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions logtail/flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@


class FlushWorker(threading.Thread):
def __init__(self, upload, pipe, buffer_capacity, flush_interval):
def __init__(self, upload, pipe, buffer_capacity, flush_interval, check_interval):
threading.Thread.__init__(self)
self.parent_thread = threading.current_thread()
self.upload = upload
self.pipe = pipe
self.buffer_capacity = buffer_capacity
self.flush_interval = flush_interval
self.check_interval = check_interval
self.should_run = True
self._flushing = False
self._clean = True

def run(self):
while self.should_run:
Expand All @@ -27,6 +30,7 @@ def step(self):
last_flush = time.time()
time_remaining = _initial_time_remaining(self.flush_interval)
frame = []
self._clean = True

# If the parent thread has exited but there are still outstanding
# events, attempt to send them before exiting.
Expand All @@ -38,16 +42,17 @@ def step(self):
# `flush_interval` seconds have passed without sending any events.
while len(frame) < self.buffer_capacity and time_remaining > 0:
try:
# Blocks for up to 1.0 seconds for each item to prevent
# Blocks for up to `check_interval` seconds for each item to prevent
# spinning and burning CPU unnecessarily. Could block for the
# entire amount of `time_remaining` but then in the case that
# the parent thread has exited, that entire amount of time
# would be waited before this child worker thread exits.
entry = self.pipe.get(block=(not shutdown), timeout=1.0)
entry = self.pipe.get(block=(not shutdown), timeout=self.check_interval)
self._clean = False
frame.append(entry)
self.pipe.task_done()
except queue.Empty:
if shutdown:
if shutdown or self._flushing:
break
shutdown = not self.parent_thread.is_alive()
time_remaining = _calculate_time_remaining(last_flush, self.flush_interval)
Expand All @@ -68,9 +73,15 @@ def step(self):
if response.status_code == 500 and getattr(response, "exception") != None:
print('Failed to send logs to Better Stack after {} retries: {}'.format(len(RETRY_SCHEDULE), response.exception))

self._clean = True
if shutdown and self.pipe.empty():
self.should_run = False

def flush(self):
self._flushing = True
while not self._clean or not self.pipe.empty():
time.sleep(self.check_interval)
self._flushing = False

def _initial_time_remaining(flush_interval):
return flush_interval
Expand Down
10 changes: 9 additions & 1 deletion logtail/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
DEFAULT_HOST = 'https://in.logs.betterstack.com'
DEFAULT_BUFFER_CAPACITY = 1000
DEFAULT_FLUSH_INTERVAL = 1
DEFAULT_CHECK_INTERVAL = 0.1
DEFAULT_RAISE_EXCEPTIONS = False
DEFAULT_DROP_EXTRA_EVENTS = True
DEFAULT_INCLUDE_EXTRA_ATTRIBUTES = True
Expand All @@ -23,6 +24,7 @@ def __init__(self,
host=DEFAULT_HOST,
buffer_capacity=DEFAULT_BUFFER_CAPACITY,
flush_interval=DEFAULT_FLUSH_INTERVAL,
check_interval=DEFAULT_CHECK_INTERVAL,
raise_exceptions=DEFAULT_RAISE_EXCEPTIONS,
drop_extra_events=DEFAULT_DROP_EXTRA_EVENTS,
include_extra_attributes=DEFAULT_INCLUDE_EXTRA_ATTRIBUTES,
Expand All @@ -38,6 +40,7 @@ def __init__(self,
self.include_extra_attributes = include_extra_attributes
self.buffer_capacity = buffer_capacity
self.flush_interval = flush_interval
self.check_interval = check_interval
self.raise_exceptions = raise_exceptions
self.dropcount = 0
# Do not initialize the flush thread yet because it causes issues on Render.
Expand All @@ -51,7 +54,8 @@ def ensure_flush_thread_alive(self):
self.uploader,
self.pipe,
self.buffer_capacity,
self.flush_interval
self.flush_interval,
self.check_interval,
)
self.flush_thread.start()

Expand All @@ -71,3 +75,7 @@ def emit(self, record):
except Exception as e:
if self.raise_exceptions:
raise e

def flush(self):
if self.flush_thread and self.flush_thread.is_alive():
self.flush_thread.flush()
13 changes: 8 additions & 5 deletions tests/test_flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import threading
import unittest

from unittest.mock import patch

from logtail.compat import queue
from logtail.flusher import RETRY_SCHEDULE
from logtail.flusher import FlushWorker
Expand All @@ -17,11 +19,12 @@ class TestFlushWorker(unittest.TestCase):
source_token = 'dummy_source_token'
buffer_capacity = 5
flush_interval = 2
check_interval = 0.01

def _setup_worker(self, uploader=None):
pipe = queue.Queue(maxsize=self.buffer_capacity)
uploader = uploader or Uploader(self.source_token, self.host)
fw = FlushWorker(uploader, pipe, self.buffer_capacity, self.flush_interval)
fw = FlushWorker(uploader, pipe, self.buffer_capacity, self.flush_interval, self.check_interval)
return pipe, uploader, fw

def test_is_thread(self):
Expand Down Expand Up @@ -50,7 +53,7 @@ def uploader(frame):

self.assertEqual(self.calls, 1)

@mock.patch('logtail.flusher._calculate_time_remaining')
@patch('logtail.flusher._calculate_time_remaining')
def test_flushes_after_interval(self, calculate_time_remaining):
self.buffer_capacity = 10
num_items = 2
Expand Down Expand Up @@ -82,8 +85,8 @@ def timeout(last_flush, interval):
self.assertEqual(self.upload_calls, 1)
self.assertEqual(self.timeout_calls, 2)

@mock.patch('logtail.flusher._calculate_time_remaining')
@mock.patch('logtail.flusher._initial_time_remaining')
@patch('logtail.flusher._calculate_time_remaining')
@patch('logtail.flusher._initial_time_remaining')
def test_does_nothing_without_any_items(self, initial_time_remaining, calculate_time_remaining):
calculate_time_remaining.side_effect = lambda a,b: 0.0
initial_time_remaining.side_effect = lambda a: 0.0001
Expand All @@ -95,7 +98,7 @@ def test_does_nothing_without_any_items(self, initial_time_remaining, calculate_
fw.step()
self.assertFalse(uploader.called)

@mock.patch('logtail.flusher.time.sleep')
@patch('logtail.flusher.time.sleep')
def test_retries_according_to_schedule(self, mock_sleep):
first_frame = list(range(self.buffer_capacity))

Expand Down
32 changes: 18 additions & 14 deletions tests/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@
import unittest
import logging

from logtail import LogtailHandler, context
from unittest.mock import patch

from logtail import LogtailHandler, context
from logtail.handler import FlushWorker

class TestLogtailHandler(unittest.TestCase):
source_token = 'dummy_source_token'
host = 'dummy_host'

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_handler_creates_uploader_from_args(self, MockWorker):
handler = LogtailHandler(source_token=self.source_token, host=self.host)
self.assertEqual(handler.uploader.source_token, self.source_token)
self.assertEqual(handler.uploader.host, self.host)

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_handler_creates_pipe_from_args(self, MockWorker):
buffer_capacity = 9
flush_interval = 1
Expand All @@ -30,11 +32,12 @@ def test_handler_creates_pipe_from_args(self, MockWorker):
)
self.assertTrue(handler.pipe.empty())

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_handler_creates_and_starts_worker_from_args_after_first_log(self, MockWorker):
buffer_capacity = 9
flush_interval = 9
handler = LogtailHandler(source_token=self.source_token, buffer_capacity=buffer_capacity, flush_interval=flush_interval)
check_interval = 4
handler = LogtailHandler(source_token=self.source_token, buffer_capacity=buffer_capacity, flush_interval=flush_interval, check_interval=check_interval)

self.assertFalse(MockWorker.called)

Expand All @@ -47,11 +50,12 @@ def test_handler_creates_and_starts_worker_from_args_after_first_log(self, MockW
handler.uploader,
handler.pipe,
buffer_capacity,
flush_interval
flush_interval,
check_interval,
)
self.assertEqual(handler.flush_thread.start.call_count, 1)

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_emit_starts_thread_if_not_alive(self, MockWorker):
handler = LogtailHandler(source_token=self.source_token)

Expand All @@ -67,7 +71,7 @@ def test_emit_starts_thread_if_not_alive(self, MockWorker):

self.assertEqual(handler.flush_thread.start.call_count, 2)

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_emit_drops_records_if_configured(self, MockWorker):
buffer_capacity = 1
handler = LogtailHandler(
Expand All @@ -87,7 +91,7 @@ def test_emit_drops_records_if_configured(self, MockWorker):
self.assertTrue(handler.pipe.empty())
self.assertEqual(handler.dropcount, 1)

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_emit_does_not_drop_records_if_configured(self, MockWorker):
buffer_capacity = 1
handler = LogtailHandler(
Expand Down Expand Up @@ -118,7 +122,7 @@ def consumer(q):

self.assertEqual(handler.dropcount, 0)

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_error_suppression(self, MockWorker):
buffer_capacity = 1
handler = LogtailHandler(
Expand All @@ -139,7 +143,7 @@ def test_error_suppression(self, MockWorker):
handler.raise_exceptions = False
logger.critical('hello')

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_can_send_unserializable_extra_data(self, MockWorker):
buffer_capacity = 1
handler = LogtailHandler(
Expand All @@ -158,7 +162,7 @@ def test_can_send_unserializable_extra_data(self, MockWorker):
self.assertRegex(log_entry['data']['unserializable'], r'^<tests\.test_handler\.UnserializableObject object at 0x[0-f]+>$')
self.assertTrue(handler.pipe.empty())

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_can_send_unserializable_context(self, MockWorker):
buffer_capacity = 1
handler = LogtailHandler(
Expand All @@ -178,7 +182,7 @@ def test_can_send_unserializable_context(self, MockWorker):
self.assertRegex(log_entry['context']['data']['unserializable'], r'^<tests\.test_handler\.UnserializableObject object at 0x[0-f]+>$')
self.assertTrue(handler.pipe.empty())

@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_can_send_circular_dependency_in_extra_data(self, MockWorker):
buffer_capacity = 1
handler = LogtailHandler(
Expand All @@ -200,7 +204,7 @@ def test_can_send_circular_dependency_in_extra_data(self, MockWorker):
self.assertTrue(handler.pipe.empty())


@mock.patch('logtail.handler.FlushWorker')
@patch('logtail.handler.FlushWorker')
def test_can_send_circular_dependency_in_context(self, MockWorker):
buffer_capacity = 1
handler = LogtailHandler(
Expand Down
4 changes: 3 additions & 1 deletion tests/test_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import mock
import unittest

from unittest.mock import patch

from logtail.uploader import Uploader


Expand All @@ -12,7 +14,7 @@ class TestUploader(unittest.TestCase):
source_token = 'dummy_source_token'
frame = [1, 2, 3]

@mock.patch('logtail.uploader.requests.Session.post')
@patch('logtail.uploader.requests.Session.post')
def test_call(self, post):
def mock_post(endpoint, data=None, headers=None):
# Check that the data is sent to ther correct endpoint
Expand Down