Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions specs/core-pipeline/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -486,12 +486,12 @@ class Sequencer:
Implement per-partition buffering with size and time-based flushing.

#### Acceptance Criteria
- [ ] Per-partition buffers (dict[int, list[TypedEvent]])
- [ ] Size-based flush (default: 100 events)
- [ ] Time-based flush (default: 5 seconds)
- [ ] Async background flusher task
- [ ] Graceful shutdown flushes all buffers
- [ ] Unit tests with mocked EventStore
- [x] Per-partition buffers (dict[int, list[TypedEvent]])
- [x] Size-based flush (default: 100 events)
- [x] Time-based flush (default: 5 seconds)
- [x] Async background flusher task
- [x] Graceful shutdown flushes all buffers
- [x] Unit tests with mocked EventStore

#### Checklist
```python
Expand Down
2 changes: 2 additions & 0 deletions src/eventkit/errors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from eventkit.errors.exceptions import (
AdapterError,
BufferFullError,
EventKitError,
StorageError,
ValidationError,
Expand All @@ -12,4 +13,5 @@
"ValidationError",
"AdapterError",
"StorageError",
"BufferFullError",
]
6 changes: 6 additions & 0 deletions src/eventkit/errors/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ class StorageError(EventKitError):
"""Raised when event storage fails."""

pass


class BufferFullError(EventKitError):
"""Raised when buffer exceeds maximum size."""

pass
3 changes: 2 additions & 1 deletion src/eventkit/processing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Event processing primitives."""

from eventkit.processing.buffer import Buffer
from eventkit.processing.sequencer import HashSequencer, Sequencer

__all__ = ["Sequencer", "HashSequencer"]
__all__ = ["Buffer", "Sequencer", "HashSequencer"]
173 changes: 173 additions & 0 deletions src/eventkit/processing/buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""Per-partition event buffer with size and time-based flushing."""

import asyncio
from datetime import UTC, datetime

from eventkit.errors import BufferFullError
from eventkit.processing.buffer_storage import BufferStorage, InMemoryBufferStorage
from eventkit.schema.events import TypedEvent
from eventkit.stores.event_store import EventStore


class Buffer:
"""
Per-partition event buffer with size and time-based flushing.

Events are buffered in memory per partition and flushed to storage when:
1. Buffer size reaches threshold (default: 100 events)
2. Time threshold reached (default: 5 seconds)
3. Graceful shutdown (flush all)

Buffer storage is pluggable - use in-memory (default), disk, Redis, or custom.

Example:
# Default in-memory storage
store = FirestoreEventStore(...)
buffer = Buffer(event_store=store, size=100, timeout=5.0)

# Custom disk-backed storage
disk_storage = DiskBufferStorage("/tmp/eventkit")
buffer = Buffer(event_store=store, storage=disk_storage)

await buffer.start_flusher()

# Enqueue events
await buffer.enqueue(event, partition_id=7)

# Graceful shutdown
await buffer.stop_flusher()
"""

def __init__(
self,
event_store: EventStore,
storage: BufferStorage | None = None,
size: int = 100,
max_size: int = 1000,
timeout: float = 5.0,
) -> None:
"""
Initialize buffer.

Args:
event_store: EventStore for batch writes
storage: BufferStorage backend (default: InMemoryBufferStorage)
size: Max events per partition before flush (default: 100)
max_size: Hard limit per partition (default: 1000, 10x size)
timeout: Max seconds between flushes (default: 5.0)
"""
self.event_store = event_store
self.storage = storage or InMemoryBufferStorage()
self.size = size
self.max_size = max_size
self.timeout = timeout

# Per-partition flush tracking
self.last_flush: dict[int, datetime] = {}

# Locking and lifecycle
self.lock = asyncio.Lock()
self.flusher_task: asyncio.Task[None] | None = None
self.shutdown = False

async def enqueue(self, event: TypedEvent, partition_id: int) -> None:
"""
Enqueue event to partition buffer.

Flushes immediately if buffer size threshold reached.
If buffer is at max_size, attempts flush before raising BufferFullError.

Args:
event: TypedEvent to buffer
partition_id: Partition ID (from sequencer)

Raises:
BufferFullError: If partition buffer exceeds max_size and flush fails
"""
async with self.lock:
# Defensive: If at max_size, try flushing first
if self.storage.len(partition_id) >= self.max_size:
try:
await self._flush_partition(partition_id)
except Exception:
# Flush failed, buffer truly stuck
raise BufferFullError(
f"Partition {partition_id} buffer full "
f"({self.max_size} events). Flush is failing or too slow."
)

await self.storage.append(partition_id, event)

# Size-based flush
if self.storage.len(partition_id) >= self.size:
await self._flush_partition(partition_id)

async def _flush_partition(self, partition_id: int) -> None:
"""
Flush single partition to storage.

Args:
partition_id: Partition to flush
"""
events = await self.storage.get_all(partition_id)
if not events:
return

# Batch write to storage
await self.event_store.store_batch(events)

# Clear buffer
await self.storage.clear(partition_id)
self.last_flush[partition_id] = datetime.now(UTC)

async def start_flusher(self) -> None:
"""
Start background time-based flusher task.

Raises:
RuntimeError: If flusher is already running
"""
if self.flusher_task is not None:
raise RuntimeError("Flusher already running")

self.shutdown = False
self.flusher_task = asyncio.create_task(self._run_flusher())

async def _run_flusher(self) -> None:
"""
Background task for time-based flushing.

Flushes all partitions every timeout seconds.
"""
while not self.shutdown:
await asyncio.sleep(self.timeout)
await self.flush_all()

async def flush_all(self) -> None:
"""
Flush all partition buffers.

Used for time-based flush and graceful shutdown.
"""
async with self.lock:
for partition_id in self.storage.partitions():
await self._flush_partition(partition_id)

async def stop_flusher(self) -> None:
"""
Stop flusher and flush all remaining events.

Ensures no events are lost on shutdown.
"""
# Signal shutdown
self.shutdown = True

# Wait for flusher task to complete
if self.flusher_task:
try:
await asyncio.wait_for(self.flusher_task, timeout=2.0)
except TimeoutError:
self.flusher_task.cancel()

# Final flush
await self.flush_all()
120 changes: 120 additions & 0 deletions src/eventkit/processing/buffer_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Buffer storage backends for pluggable buffer implementations."""

from collections import defaultdict
from typing import Protocol

from eventkit.schema.events import TypedEvent


class BufferStorage(Protocol):
"""
Protocol for buffer storage backends.

Implementations can be in-memory, disk-backed, Redis, or custom.

Implementations:
- InMemoryBufferStorage (default): Fast, volatile, limited by RAM
- DiskBufferStorage (future): Persistent, larger capacity
- RedisBufferStorage (future): Distributed, shared across processes

Example:
# Default in-memory
storage = InMemoryBufferStorage()

# Custom disk-backed (user implementation)
storage = DiskBufferStorage("/tmp/eventkit_buffers")

# Use with Buffer
buffer = Buffer(event_store=store, storage=storage)
"""

async def append(self, partition_id: int, event: TypedEvent) -> None:
"""
Add event to partition buffer.

Args:
partition_id: Partition ID
event: Event to buffer
"""
...

async def get_all(self, partition_id: int) -> list[TypedEvent]:
"""
Get all events from partition buffer.

Args:
partition_id: Partition ID

Returns:
List of buffered events (may be empty)
"""
...

async def clear(self, partition_id: int) -> None:
"""
Clear partition buffer.

Args:
partition_id: Partition ID
"""
...

def len(self, partition_id: int) -> int:
"""
Get partition buffer size.

Args:
partition_id: Partition ID

Returns:
Number of events in buffer
"""
...

def partitions(self) -> list[int]:
"""
Get all partition IDs with buffered events.

Returns:
List of partition IDs
"""
...


class InMemoryBufferStorage:
"""
In-memory buffer storage (default).

Fast but volatile - data lost on crash.
Limited by available RAM.

Example:
storage = InMemoryBufferStorage()
await storage.append(partition_id=0, event=event)
events = await storage.get_all(partition_id=0)
await storage.clear(partition_id=0)
"""

def __init__(self) -> None:
"""Initialize in-memory storage."""
self.buffers: dict[int, list[TypedEvent]] = defaultdict(list)

async def append(self, partition_id: int, event: TypedEvent) -> None:
"""Add event to partition buffer."""
self.buffers[partition_id].append(event)

async def get_all(self, partition_id: int) -> list[TypedEvent]:
"""Get all events from partition buffer."""
return self.buffers[partition_id]

async def clear(self, partition_id: int) -> None:
"""Clear partition buffer."""
self.buffers[partition_id].clear()

def len(self, partition_id: int) -> int:
"""Get partition buffer size."""
return len(self.buffers[partition_id])

def partitions(self) -> list[int]:
"""Get all partition IDs with buffered events."""
return list(self.buffers.keys())
Loading
Loading