Skip to content

Commit ec36dca

Browse files
committed
ref: Deduplicate batchers
1 parent 8159cae commit ec36dca

File tree

3 files changed

+184
-247
lines changed

3 files changed

+184
-247
lines changed

sentry_sdk/_batcher.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
import os
2+
import random
3+
import threading
4+
from datetime import datetime, timezone
5+
from typing import Optional, List, Callable, TYPE_CHECKING, Any
6+
7+
from sentry_sdk.utils import format_timestamp, safe_repr, serialize_attribute
8+
from sentry_sdk.envelope import Envelope, Item, PayloadRef
9+
10+
if TYPE_CHECKING:
11+
from typing import TypeVar
12+
13+
T = TypeVar("T")
14+
15+
16+
class Batcher:
17+
MAX_BEFORE_FLUSH = 100
18+
MAX_BEFORE_DROP = 1_000
19+
FLUSH_WAIT_TIME = 5.0
20+
21+
TYPE = ""
22+
CONTENT_TYPE = ""
23+
24+
def __init__(
25+
self,
26+
capture_func: "Callable[[Envelope], None]",
27+
record_lost_func: "Callable[..., None]",
28+
) -> None:
29+
self._buffer: "List[T]" = []
30+
self._capture_func = capture_func
31+
self._record_lost_func = record_lost_func
32+
self._running = True
33+
self._lock = threading.Lock()
34+
35+
self._flush_event: "threading.Event" = threading.Event()
36+
37+
self._flusher: "Optional[threading.Thread]" = None
38+
self._flusher_pid: "Optional[int]" = None
39+
40+
def _ensure_thread(self) -> bool:
41+
"""For forking processes we might need to restart this thread.
42+
This ensures that our process actually has that thread running.
43+
"""
44+
if not self._running:
45+
return False
46+
47+
pid = os.getpid()
48+
if self._flusher_pid == pid:
49+
return True
50+
51+
with self._lock:
52+
# Recheck to make sure another thread didn't get here and start the
53+
# the flusher in the meantime
54+
if self._flusher_pid == pid:
55+
return True
56+
57+
self._flusher_pid = pid
58+
59+
self._flusher = threading.Thread(target=self._flush_loop)
60+
self._flusher.daemon = True
61+
62+
try:
63+
self._flusher.start()
64+
except RuntimeError:
65+
# Unfortunately at this point the interpreter is in a state that no
66+
# longer allows us to spawn a thread and we have to bail.
67+
self._running = False
68+
return False
69+
70+
return True
71+
72+
def _flush_loop(self) -> None:
73+
while self._running:
74+
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
75+
self._flush_event.clear()
76+
self._flush()
77+
78+
def add(self, item: "T") -> None:
79+
if not self._ensure_thread() or self._flusher is None:
80+
return None
81+
82+
with self._lock:
83+
if len(self._buffer) >= self.MAX_BEFORE_DROP:
84+
self._record_lost(item)
85+
return None
86+
87+
self._buffer.append(item)
88+
if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
89+
self._flush_event.set()
90+
91+
def kill(self) -> None:
92+
if self._flusher is None:
93+
return
94+
95+
self._running = False
96+
self._flush_event.set()
97+
self._flusher = None
98+
99+
def flush(self) -> None:
100+
self._flush()
101+
102+
@staticmethod
103+
def _to_transport_format(item: "T") -> "Any":
104+
pass
105+
106+
def _add_to_envelope(self, envelope: "Envelope") -> None:
107+
envelope.add_item(
108+
Item(
109+
type=self.TYPE,
110+
content_type=self.CONTENT_TYPE,
111+
headers={
112+
"item_count": len(self._buffer),
113+
},
114+
payload=PayloadRef(
115+
json={
116+
"items": [
117+
self._to_transport_format(item) for item in self._buffer
118+
]
119+
}
120+
),
121+
)
122+
)
123+
124+
def _record_lost(self, item: "T") -> None:
125+
pass
126+
127+
def _flush(self) -> "Optional[Envelope]":
128+
envelope = Envelope(
129+
headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
130+
)
131+
with self._lock:
132+
if len(self._buffer) == 0:
133+
return None
134+
135+
self._add_to_envelope(envelope)
136+
self._buffer.clear()
137+
138+
self._capture_func(envelope)
139+
return envelope

sentry_sdk/_log_batcher.py

Lines changed: 33 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -4,161 +4,56 @@
44
from datetime import datetime, timezone
55
from typing import Optional, List, Callable, TYPE_CHECKING, Any
66

7+
from sentry_sdk._batcher import Batcher
78
from sentry_sdk.utils import format_timestamp, safe_repr, serialize_attribute
89
from sentry_sdk.envelope import Envelope, Item, PayloadRef
910

1011
if TYPE_CHECKING:
1112
from sentry_sdk._types import Log
1213

1314

14-
class LogBatcher:
15-
MAX_LOGS_BEFORE_FLUSH = 100
16-
MAX_LOGS_BEFORE_DROP = 1_000
15+
class LogBatcher(Batcher):
16+
MAX_BEFORE_FLUSH = 100
17+
MAX_BEFORE_DROP = 1_000
1718
FLUSH_WAIT_TIME = 5.0
1819

19-
def __init__(
20-
self,
21-
capture_func: "Callable[[Envelope], None]",
22-
record_lost_func: "Callable[..., None]",
23-
) -> None:
24-
self._log_buffer: "List[Log]" = []
25-
self._capture_func = capture_func
26-
self._record_lost_func = record_lost_func
27-
self._running = True
28-
self._lock = threading.Lock()
20+
TYPE = "log"
21+
CONTENT_TYPE = "application/vnd.sentry.items.log+json"
2922

30-
self._flush_event: "threading.Event" = threading.Event()
31-
32-
self._flusher: "Optional[threading.Thread]" = None
33-
self._flusher_pid: "Optional[int]" = None
34-
35-
def _ensure_thread(self) -> bool:
36-
"""For forking processes we might need to restart this thread.
37-
This ensures that our process actually has that thread running.
38-
"""
39-
if not self._running:
40-
return False
41-
42-
pid = os.getpid()
43-
if self._flusher_pid == pid:
44-
return True
45-
46-
with self._lock:
47-
# Recheck to make sure another thread didn't get here and start the
48-
# the flusher in the meantime
49-
if self._flusher_pid == pid:
50-
return True
51-
52-
self._flusher_pid = pid
53-
54-
self._flusher = threading.Thread(target=self._flush_loop)
55-
self._flusher.daemon = True
56-
57-
try:
58-
self._flusher.start()
59-
except RuntimeError:
60-
# Unfortunately at this point the interpreter is in a state that no
61-
# longer allows us to spawn a thread and we have to bail.
62-
self._running = False
63-
return False
64-
65-
return True
66-
67-
def _flush_loop(self) -> None:
68-
while self._running:
69-
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
70-
self._flush_event.clear()
71-
self._flush()
72-
73-
def add(
74-
self,
75-
log: "Log",
76-
) -> None:
77-
if not self._ensure_thread() or self._flusher is None:
78-
return None
79-
80-
with self._lock:
81-
if len(self._log_buffer) >= self.MAX_LOGS_BEFORE_DROP:
82-
# Construct log envelope item without sending it to report lost bytes
83-
log_item = Item(
84-
type="log",
85-
content_type="application/vnd.sentry.items.log+json",
86-
headers={
87-
"item_count": 1,
88-
},
89-
payload=PayloadRef(
90-
json={"items": [LogBatcher._log_to_transport_format(log)]}
91-
),
92-
)
93-
self._record_lost_func(
94-
reason="queue_overflow",
95-
data_category="log_item",
96-
item=log_item,
97-
quantity=1,
98-
)
99-
return None
100-
101-
self._log_buffer.append(log)
102-
if len(self._log_buffer) >= self.MAX_LOGS_BEFORE_FLUSH:
103-
self._flush_event.set()
104-
105-
def kill(self) -> None:
106-
if self._flusher is None:
107-
return
108-
109-
self._running = False
110-
self._flush_event.set()
111-
self._flusher = None
23+
def _record_lost(self, item: "Log") -> None:
24+
# Construct log envelope item without sending it to report lost bytes
25+
log_item = Item(
26+
type=self.TYPE,
27+
content_type=self.CONTENT_TYPE,
28+
headers={
29+
"item_count": 1,
30+
},
31+
payload=PayloadRef(json={"items": [self._to_transport_format(item)]}),
32+
)
11233

113-
def flush(self) -> None:
114-
self._flush()
34+
self._record_lost_func(
35+
reason="queue_overflow",
36+
data_category="log_item",
37+
item=log_item,
38+
quantity=1,
39+
)
11540

11641
@staticmethod
117-
def _log_to_transport_format(log: "Log") -> "Any":
118-
if "sentry.severity_number" not in log["attributes"]:
119-
log["attributes"]["sentry.severity_number"] = log["severity_number"]
120-
if "sentry.severity_text" not in log["attributes"]:
121-
log["attributes"]["sentry.severity_text"] = log["severity_text"]
42+
def _to_transport_format(item: "Log") -> "Any":
43+
if "sentry.severity_number" not in item["attributes"]:
44+
item["attributes"]["sentry.severity_number"] = item["severity_number"]
45+
if "sentry.severity_text" not in item["attributes"]:
46+
item["attributes"]["sentry.severity_text"] = item["severity_text"]
12247

12348
res = {
124-
"timestamp": int(log["time_unix_nano"]) / 1.0e9,
125-
"trace_id": log.get("trace_id", "00000000-0000-0000-0000-000000000000"),
126-
"span_id": log.get("span_id"),
127-
"level": str(log["severity_text"]),
128-
"body": str(log["body"]),
49+
"timestamp": int(item["time_unix_nano"]) / 1.0e9,
50+
"trace_id": item.get("trace_id", "00000000-0000-0000-0000-000000000000"),
51+
"span_id": item.get("span_id"),
52+
"level": str(item["severity_text"]),
53+
"body": str(item["body"]),
12954
"attributes": {
130-
k: serialize_attribute(v) for (k, v) in log["attributes"].items()
55+
k: serialize_attribute(v) for (k, v) in item["attributes"].items()
13156
},
13257
}
13358

13459
return res
135-
136-
def _flush(self) -> "Optional[Envelope]":
137-
envelope = Envelope(
138-
headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
139-
)
140-
with self._lock:
141-
if len(self._log_buffer) == 0:
142-
return None
143-
144-
envelope.add_item(
145-
Item(
146-
type="log",
147-
content_type="application/vnd.sentry.items.log+json",
148-
headers={
149-
"item_count": len(self._log_buffer),
150-
},
151-
payload=PayloadRef(
152-
json={
153-
"items": [
154-
self._log_to_transport_format(log)
155-
for log in self._log_buffer
156-
]
157-
}
158-
),
159-
)
160-
)
161-
self._log_buffer.clear()
162-
163-
self._capture_func(envelope)
164-
return envelope

0 commit comments

Comments
 (0)