Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
python-logging-loki
===================
# python-logging-loki

[![PyPI version](https://img.shields.io/pypi/v/python-logging-loki.svg)](https://pypi.org/project/python-logging-loki/)
[![Python version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-blue.svg)](https://www.python.org/)
Expand All @@ -9,45 +8,44 @@ python-logging-loki
Python logging handler for Loki.
https://grafana.com/loki

Installation
============
# Installation

```bash
pip install python-logging-loki
```

Usage
=====
# Usage

```python
import logging
import logging_loki


handler = logging_loki.LokiHandler(
url="https://my-loki-instance/loki/api/v1/push",
url="https://my-loki-instance/loki/api/v1/push",
tags={"application": "my-app"},
headers={"X-Scope-OrgID": "example-id"},
auth=("username", "password"),
version="1",
props_to_labels: Optional[list[str]] = ["foo"]
)

logger = logging.getLogger("my-logger")
logger.addHandler(handler)
logger.error(
"Something happened",
"Something happened",
extra={"tags": {"service": "my-service"}},
)
```

Example above will send `Something happened` message along with these labels:

- Default labels from handler
- Message level as `serverity`
- Logger's name as `logger`
- Logger's name as `logger`
- Labels from `tags` item of `extra` dict

The given example is blocking (i.e. each call will wait for the message to be sent).
But you can use the built-in `QueueHandler` and` QueueListener` to send messages in a separate thread.
But you can use the built-in `QueueHandler` and` QueueListener` to send messages in a separate thread.

```python
import logging.handlers
Expand All @@ -58,11 +56,10 @@ from multiprocessing import Queue
queue = Queue(-1)
handler = logging.handlers.QueueHandler(queue)
handler_loki = logging_loki.LokiHandler(
url="https://my-loki-instance/loki/api/v1/push",
url="https://my-loki-instance/loki/api/v1/push",
tags={"application": "my-app"},
headers={"X-Scope-OrgID": "example-id"},
auth=("username", "password"),
version="1",
props_to_labels: Optional[list[str]] = ["foo"]
)
logging.handlers.QueueListener(queue, handler_loki)
Expand All @@ -82,10 +79,9 @@ from multiprocessing import Queue

handler = logging_loki.LokiQueueHandler(
Queue(-1),
url="https://my-loki-instance/loki/api/v1/push",
url="https://my-loki-instance/loki/api/v1/push",
tags={"application": "my-app"},
auth=("username", "password"),
version="1",
)

logger = logging.getLogger("my-logger")
Expand Down
2 changes: 1 addition & 1 deletion logging_loki/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
from logging_loki.handlers import LokiQueueHandler

__all__ = ["LokiHandler", "LokiQueueHandler"]
__version__ = "0.3.1"
__version__ = "0.4.0-beta"
name = "logging_loki"
56 changes: 14 additions & 42 deletions logging_loki/emitter.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
# -*- coding: utf-8 -*-

import abc
import copy
import functools
import json
import logging
import threading
import time
from logging.config import ConvertingDict
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Any, Dict, Optional, Tuple

import requests
import rfc3339

from logging_loki import const

BasicAuth = Optional[Tuple[str, str]]


class LokiEmitter(abc.ABC):
class LokiEmitter:
"""Base Loki emitter class."""

success_response_code = const.success_response_code
Expand Down Expand Up @@ -72,16 +66,16 @@ def __call__(self, record: logging.LogRecord, line: str):
return
try:
payload = self.build_payload(record, line)
resp = self.session.post(self.url, json=payload, headers=self.headers)
if resp.status_code != self.success_response_code:
raise ValueError("Unexpected Loki API response status code: {0}".format(resp.status_code))
self._post_to_loki(payload)
finally:
self._lock.release()

@abc.abstractmethod
def build_payload(self, record: logging.LogRecord, line) -> dict:
"""Build JSON payload with a log entry."""
raise NotImplementedError # pragma: no cover
def _post_to_loki(self, payload: dict):
resp = self.session.post(self.url, json=payload, headers=self.headers)
# TODO: Enqueue logs instead of raising an error and losing the logs
if resp.status_code != self.success_response_code:
raise ValueError("Unexpected Loki API response status code: {0}".format(resp.status_code))


@property
def session(self) -> requests.Session:
Expand Down Expand Up @@ -127,33 +121,6 @@ def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]:

return tags


class LokiEmitterV0(LokiEmitter):
"""Emitter for Loki < 0.4.0."""

def build_payload(self, record: logging.LogRecord, line) -> dict:
"""Build JSON payload with a log entry."""
labels = self.build_labels(record)
ts = rfc3339.format_microsecond(record.created)
stream = {
"labels": labels,
"entries": [{"ts": ts, "line": line}],
}
return {"streams": [stream]}

def build_labels(self, record: logging.LogRecord) -> str:
"""Return Loki labels string."""
labels: List[str] = []
for label_name, label_value in self.build_tags(record).items():
cleared_name = self.format_label(str(label_name))
cleared_value = str(label_value).replace('"', r"\"")
labels.append('{0}="{1}"'.format(cleared_name, cleared_value))
return "{{{0}}}".format(",".join(labels))


class LokiEmitterV1(LokiEmitter):
"""Emitter for Loki >= 0.4.0."""

def build_payload(self, record: logging.LogRecord, line) -> dict:
"""Build JSON payload with a log entry."""
labels = self.build_tags(record)
Expand All @@ -167,3 +134,8 @@ def build_payload(self, record: logging.LogRecord, line) -> dict:
"values": [[ts, line]],
}
return {"streams": [stream]}

def emit_batch(self, records: list[Tuple[logging.LogRecord, str]]):
"""Send log records to Loki."""
streams = [self.build_payload(record[0], record[1])["streams"][0] for record in records]
self._post_to_loki({"streams": streams})
90 changes: 59 additions & 31 deletions logging_loki/handlers.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
# -*- coding: utf-8 -*-

import logging
import warnings
from logging.handlers import QueueHandler
from logging.handlers import MemoryHandler, QueueHandler
from logging.handlers import QueueListener
import os
from queue import Queue
from typing import Dict
from typing import Optional
from typing import Type
import time
from typing import Optional, Union

from logging_loki import const
from logging_loki import emitter
from logging_loki.emitter import BasicAuth, LokiEmitter

LOKI_MAX_BATCH_BUFFER_SIZE = int(os.environ.get('LOKI_MAX_BATCH_BUFFER_SIZE', 10))

class LokiQueueHandler(QueueHandler):
"""This handler automatically creates listener and `LokiHandler` to handle logs queue."""

def __init__(self, queue: Queue, **kwargs):
handler: Union['LokiBatchHandler', 'LokiHandler']

def __init__(self, queue: Queue, batch_interval: Optional[float] = None, **kwargs):
"""Create new logger handler with the specified queue and kwargs for the `LokiHandler`."""
super().__init__(queue)
self.handler = LokiHandler(**kwargs) # noqa: WPS110

loki_handler = LokiHandler(**kwargs) # noqa: WPS110
self.handler = LokiBatchHandler(batch_interval, target=loki_handler) if batch_interval else loki_handler

self.listener = QueueListener(self.queue, self.handler)
self.listener.start()

def flush(self) -> None:
super().flush()
self.handler.flush()

def __del__(self):
self.listener.stop()

Expand All @@ -33,20 +41,16 @@ class LokiHandler(logging.Handler):
`Loki API <https://github.com/grafana/loki/blob/master/docs/api.md>`_
"""

emitters: Dict[str, Type[emitter.LokiEmitter]] = {
"0": emitter.LokiEmitterV0,
"1": emitter.LokiEmitterV1,
}
emitter: LokiEmitter

def __init__(
self,
url: str,
tags: Optional[dict] = None,
headers: Optional[dict] = None,
auth: Optional[emitter.BasicAuth] = None,
version: Optional[str] = None,
auth: Optional[BasicAuth] = None,
as_json: Optional[bool] = False,
props_to_labels: Optional[list[str]] = None
props_to_labels: Optional[list[str]] = None,
):
"""
Create new Loki logging handler.
Expand All @@ -55,24 +59,13 @@ def __init__(
url: Endpoint used to send log entries to Loki (e.g. `https://my-loki-instance/loki/api/v1/push`).
tags: Default tags added to every log record.
auth: Optional tuple with username and password for basic HTTP authentication.
version: Version of Loki emitter to use.
headers: Optional record with headers that are send with each POST to loki.
as_json: Flag to support sending entire JSON record instead of only the message.
props_to_labels: List of properties that should be converted to loki labels.

"""
super().__init__()

if version is None and const.emitter_ver == "0":
msg = (
"Loki /api/prom/push endpoint is in the depreciation process starting from version 0.4.0.",
"Explicitly set the emitter version to '0' if you want to use the old endpoint.",
"Or specify '1' if you have Loki version> = 0.4.0.",
"When the old API is removed from Loki, the handler will use the new version by default.",
)
warnings.warn(" ".join(msg), DeprecationWarning)

version = version or const.emitter_ver
if version not in self.emitters:
raise ValueError("Unknown emitter version: {0}".format(version))
self.emitter = self.emitters[version](url, tags, headers, auth, as_json, props_to_labels)
self.emitter = LokiEmitter(url, tags, headers, auth, as_json, props_to_labels)

def handleError(self, record): # noqa: N802
"""Close emitter and let default handler take actions on error."""
Expand All @@ -86,3 +79,38 @@ def emit(self, record: logging.LogRecord):
self.emitter(record, self.format(record))
except Exception:
self.handleError(record)

def emit_batch(self, records: list[logging.LogRecord]):
"""Send a batch of log records to Loki."""
# noinspection PyBroadException
try:
self.emitter.emit_batch([(record, self.format(record)) for record in records])
except Exception:
for record in records:
self.handleError(record)

class LokiBatchHandler(MemoryHandler):
interval: float # The interval at which batched logs are sent in seconds
_last_flush_time: float
target: LokiHandler

def __init__(self, interval: float, capacity: int = LOKI_MAX_BATCH_BUFFER_SIZE, **kwargs):
super().__init__(capacity, **kwargs)
self.interval = interval
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that in LokiQueueHandler the batch_interval is optional but when it gets propagated to LokiQueueHandler we don't consider interval as being optional. Maybe I'm missing something. My worries are related to (time.time() - self._last_flush_time >= self.interval) condition used in shouldFlush fn because we'll get a TypeError if self.interval is None.

Copy link
Copy Markdown
Author

@simosho simosho Sep 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if batch_interval:
self.handler = LokiBatchHandler(batch_interval, target=loki_handler)
else:
self.handler = loki_handler

In the LokiQueueHandler constructor there is check wether or not batch_interval is set? This should be sufficient, right?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I see this chunky code i'm gonna change it to use a ternary operator

Copy link
Copy Markdown
Member

@plankthom plankthom Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably too late for this library, but I would (here and elsewhere) only use dependency injection in the__init__ methods, and not create objects from properties (like the target handler in this case) .. the init should only be a factory method for the class itself, not its members ...

You can then still have other static builder/factory methods that create the objects and stitches them together.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to add dependency injection as well, but since this repo is a fork, I just want to make minimal changes

self._last_flush_time = time.time()

def flush(self) -> None:
self.acquire()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this acquired lock imply that all logs made during the flush are lost because of the

if not self._lock.acquire(blocking=False):
            return

in __call__?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM, it's a lock on another object ... (the emitter, not the handler)

Copy link
Copy Markdown
Author

@simosho simosho Sep 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, the handler lock is to prevent the entire buffer of logs to be flushed/sent twice

The other lock used in the emitter is to prevent the emitter from creating an infinite loop:
when it's forwarding a log line to loki, we don't want to forward any newly created log lines in the emit methods itself (e.g. from urrlib/httpx/requests because of the request to loki)

try:
if self.target and self.buffer:
self.target.emit_batch(self.buffer)
self.buffer.clear()
finally:
self._last_flush_time = time.time()
self.release()

def shouldFlush(self, record: logging.LogRecord) -> bool:
return (
super().shouldFlush(record) or
(time.time() - self._last_flush_time >= self.interval)
)
Loading