From ee763cd4de7f8df4dcbfdb04a00b2cdfc1118a4c Mon Sep 17 00:00:00 2001 From: Stewart Robertson Date: Wed, 20 Oct 2021 18:24:57 +0100 Subject: [PATCH] fixes #411 --- .../handlers/transports/background_thread.py | 29 +++++++++++-------- .../transports/test_background_thread.py | 24 ++++++++++----- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/google/cloud/logging_v2/handlers/transports/background_thread.py b/google/cloud/logging_v2/handlers/transports/background_thread.py index 60828a117..c32fa056d 100644 --- a/google/cloud/logging_v2/handlers/transports/background_thread.py +++ b/google/cloud/logging_v2/handlers/transports/background_thread.py @@ -29,6 +29,7 @@ from google.cloud.logging_v2 import _helpers from google.cloud.logging_v2.handlers.transports.base import Transport +from google.cloud.logging_v2.services.config_service_v2.transports.base import DEFAULT_CLIENT_INFO _DEFAULT_GRACE_PERIOD = 5.0 # Seconds _DEFAULT_MAX_BATCH_SIZE = 10 @@ -221,21 +222,12 @@ def _main_thread_terminated(self): file=sys.stderr, ) - def enqueue(self, record, message, **kwargs): + def enqueue(self, queue_entry): """Queues a log entry to be written by the background thread. Args: - record (logging.LogRecord): Python log record that the handler was called with. - message (str): The message from the ``LogRecord`` after being - formatted by the associated log formatters. - kwargs: Additional optional arguments for the logger + queue_entry: Formatted log entry payload. """ - queue_entry = { - "info": {"message": message, "python_logger": record.name}, - "severity": _helpers._normalize_severity(record.levelno), - "timestamp": datetime.datetime.utcfromtimestamp(record.created), - } - queue_entry.update(kwargs) self._queue.put_nowait(queue_entry) def flush(self): @@ -243,6 +235,16 @@ def flush(self): self._queue.join() +class PayloadFormatter(object): + def format(self, record, message, **kwargs): + queue_entry = { + "info": {"message": message, "python_logger": record.name}, + "severity": _helpers._normalize_severity(record.levelno), + "timestamp": datetime.datetime.utcfromtimestamp(record.created), + } + queue_entry.update(kwargs) + return queue_entry + class BackgroundThreadTransport(Transport): """Asynchronous transport that uses a background thread.""" @@ -254,6 +256,7 @@ def __init__( grace_period=_DEFAULT_GRACE_PERIOD, batch_size=_DEFAULT_MAX_BATCH_SIZE, max_latency=_DEFAULT_MAX_LATENCY, + payload_formatter=PayloadFormatter ): """ Args: @@ -279,6 +282,7 @@ def __init__( max_latency=max_latency, ) self.worker.start() + self.payload_formatter = payload_formatter() def send(self, record, message, **kwargs): """Overrides Transport.send(). @@ -289,7 +293,8 @@ def send(self, record, message, **kwargs): formatted by the associated log formatters. kwargs: Additional optional arguments for the logger """ - self.worker.enqueue(record, message, **kwargs) + payload = self.payload_formatter.format(record, message, **kwargs) + self.worker.enqueue(payload) def flush(self): """Submit any pending log records.""" diff --git a/tests/unit/handlers/transports/test_background_thread.py b/tests/unit/handlers/transports/test_background_thread.py index 1666cd74b..50ba4a61a 100644 --- a/tests/unit/handlers/transports/test_background_thread.py +++ b/tests/unit/handlers/transports/test_background_thread.py @@ -16,9 +16,11 @@ import logging import queue import unittest +from unittest.case import expectedFailure import mock +from google.cloud.logging_v2.handlers.transports.background_thread import PayloadFormatter class TestBackgroundThreadHandler(unittest.TestCase): PROJECT = "PROJECT" @@ -62,11 +64,12 @@ def test_send(self): ) transport.send(record, message, resource=_GLOBAL_RESOURCE) - - transport.worker.enqueue.assert_called_once_with( - record, message, resource=_GLOBAL_RESOURCE, + expected_payload = transport.payload_formatter.format( + record, message, resource=_GLOBAL_RESOURCE ) + transport.worker.enqueue.assert_called_once_with(expected_payload) + def test_trace_send(self): from google.cloud.logging_v2.logger import _GLOBAL_RESOURCE @@ -85,8 +88,11 @@ def test_trace_send(self): transport.send(record, message, resource=_GLOBAL_RESOURCE, trace=trace) + expected_payload = transport.payload_formatter.format( + record, message, resource=_GLOBAL_RESOURCE, trace=trace + ) transport.worker.enqueue.assert_called_once_with( - record, message, resource=_GLOBAL_RESOURCE, trace=trace, + expected_payload ) def test_span_send(self): @@ -107,8 +113,9 @@ def test_span_send(self): transport.send(record, message, resource=_GLOBAL_RESOURCE, span_id=span_id) + expected_payload = transport.payload_formatter.format(record, message, resource=_GLOBAL_RESOURCE, span_id=span_id) transport.worker.enqueue.assert_called_once_with( - record, message, resource=_GLOBAL_RESOURCE, span_id=span_id, + expected_payload ) def test_flush(self): @@ -246,7 +253,7 @@ def test__main_thread_terminated_non_empty_queue(self): self._start_with_thread_patch(worker) record = mock.Mock() record.created = time.time() - worker.enqueue(record, "") + worker.enqueue(record) worker._main_thread_terminated() self.assertFalse(worker.is_alive) @@ -258,7 +265,7 @@ def test__main_thread_terminated_did_not_join(self): worker._thread._terminate_on_join = False record = mock.Mock() record.created = time.time() - worker.enqueue(record, "") + worker.enqueue(record) worker._main_thread_terminated() self.assertFalse(worker.is_alive) @@ -266,7 +273,8 @@ def test__main_thread_terminated_did_not_join(self): @staticmethod def _enqueue_record(worker, message, levelno=logging.INFO, **kw): record = logging.LogRecord("testing", levelno, None, None, message, None, None) - worker.enqueue(record, message, **kw) + payload = PayloadFormatter().format(record, message, **kw) + worker.enqueue(payload) def test_enqueue_defaults(self): import datetime