diff --git a/specs/core-pipeline/tasks.md b/specs/core-pipeline/tasks.md index 9457ca0..3362eac 100644 --- a/specs/core-pipeline/tasks.md +++ b/specs/core-pipeline/tasks.md @@ -486,12 +486,12 @@ class Sequencer: Implement per-partition buffering with size and time-based flushing. #### Acceptance Criteria -- [ ] Per-partition buffers (dict[int, list[TypedEvent]]) -- [ ] Size-based flush (default: 100 events) -- [ ] Time-based flush (default: 5 seconds) -- [ ] Async background flusher task -- [ ] Graceful shutdown flushes all buffers -- [ ] Unit tests with mocked EventStore +- [x] Per-partition buffers (dict[int, list[TypedEvent]]) +- [x] Size-based flush (default: 100 events) +- [x] Time-based flush (default: 5 seconds) +- [x] Async background flusher task +- [x] Graceful shutdown flushes all buffers +- [x] Unit tests with mocked EventStore #### Checklist ```python diff --git a/src/eventkit/errors/__init__.py b/src/eventkit/errors/__init__.py index 82042ee..6b92ad8 100644 --- a/src/eventkit/errors/__init__.py +++ b/src/eventkit/errors/__init__.py @@ -2,6 +2,7 @@ from eventkit.errors.exceptions import ( AdapterError, + BufferFullError, EventKitError, StorageError, ValidationError, @@ -12,4 +13,5 @@ "ValidationError", "AdapterError", "StorageError", + "BufferFullError", ] diff --git a/src/eventkit/errors/exceptions.py b/src/eventkit/errors/exceptions.py index 84e4db6..c1b2158 100644 --- a/src/eventkit/errors/exceptions.py +++ b/src/eventkit/errors/exceptions.py @@ -23,3 +23,9 @@ class StorageError(EventKitError): """Raised when event storage fails.""" pass + + +class BufferFullError(EventKitError): + """Raised when buffer exceeds maximum size.""" + + pass diff --git a/src/eventkit/processing/__init__.py b/src/eventkit/processing/__init__.py index 021c24b..4ac6083 100644 --- a/src/eventkit/processing/__init__.py +++ b/src/eventkit/processing/__init__.py @@ -1,5 +1,6 @@ """Event processing primitives.""" +from eventkit.processing.buffer import Buffer from eventkit.processing.sequencer import HashSequencer, Sequencer -__all__ = ["Sequencer", "HashSequencer"] +__all__ = ["Buffer", "Sequencer", "HashSequencer"] diff --git a/src/eventkit/processing/buffer.py b/src/eventkit/processing/buffer.py new file mode 100644 index 0000000..e37c49b --- /dev/null +++ b/src/eventkit/processing/buffer.py @@ -0,0 +1,173 @@ +"""Per-partition event buffer with size and time-based flushing.""" + +import asyncio +from datetime import UTC, datetime + +from eventkit.errors import BufferFullError +from eventkit.processing.buffer_storage import BufferStorage, InMemoryBufferStorage +from eventkit.schema.events import TypedEvent +from eventkit.stores.event_store import EventStore + + +class Buffer: + """ + Per-partition event buffer with size and time-based flushing. + + Events are buffered in memory per partition and flushed to storage when: + 1. Buffer size reaches threshold (default: 100 events) + 2. Time threshold reached (default: 5 seconds) + 3. Graceful shutdown (flush all) + + Buffer storage is pluggable - use in-memory (default), disk, Redis, or custom. + + Example: + # Default in-memory storage + store = FirestoreEventStore(...) + buffer = Buffer(event_store=store, size=100, timeout=5.0) + + # Custom disk-backed storage + disk_storage = DiskBufferStorage("/tmp/eventkit") + buffer = Buffer(event_store=store, storage=disk_storage) + + await buffer.start_flusher() + + # Enqueue events + await buffer.enqueue(event, partition_id=7) + + # Graceful shutdown + await buffer.stop_flusher() + """ + + def __init__( + self, + event_store: EventStore, + storage: BufferStorage | None = None, + size: int = 100, + max_size: int = 1000, + timeout: float = 5.0, + ) -> None: + """ + Initialize buffer. + + Args: + event_store: EventStore for batch writes + storage: BufferStorage backend (default: InMemoryBufferStorage) + size: Max events per partition before flush (default: 100) + max_size: Hard limit per partition (default: 1000, 10x size) + timeout: Max seconds between flushes (default: 5.0) + """ + self.event_store = event_store + self.storage = storage or InMemoryBufferStorage() + self.size = size + self.max_size = max_size + self.timeout = timeout + + # Per-partition flush tracking + self.last_flush: dict[int, datetime] = {} + + # Locking and lifecycle + self.lock = asyncio.Lock() + self.flusher_task: asyncio.Task[None] | None = None + self.shutdown = False + + async def enqueue(self, event: TypedEvent, partition_id: int) -> None: + """ + Enqueue event to partition buffer. + + Flushes immediately if buffer size threshold reached. + If buffer is at max_size, attempts flush before raising BufferFullError. + + Args: + event: TypedEvent to buffer + partition_id: Partition ID (from sequencer) + + Raises: + BufferFullError: If partition buffer exceeds max_size and flush fails + """ + async with self.lock: + # Defensive: If at max_size, try flushing first + if self.storage.len(partition_id) >= self.max_size: + try: + await self._flush_partition(partition_id) + except Exception: + # Flush failed, buffer truly stuck + raise BufferFullError( + f"Partition {partition_id} buffer full " + f"({self.max_size} events). Flush is failing or too slow." + ) + + await self.storage.append(partition_id, event) + + # Size-based flush + if self.storage.len(partition_id) >= self.size: + await self._flush_partition(partition_id) + + async def _flush_partition(self, partition_id: int) -> None: + """ + Flush single partition to storage. + + Args: + partition_id: Partition to flush + """ + events = await self.storage.get_all(partition_id) + if not events: + return + + # Batch write to storage + await self.event_store.store_batch(events) + + # Clear buffer + await self.storage.clear(partition_id) + self.last_flush[partition_id] = datetime.now(UTC) + + async def start_flusher(self) -> None: + """ + Start background time-based flusher task. + + Raises: + RuntimeError: If flusher is already running + """ + if self.flusher_task is not None: + raise RuntimeError("Flusher already running") + + self.shutdown = False + self.flusher_task = asyncio.create_task(self._run_flusher()) + + async def _run_flusher(self) -> None: + """ + Background task for time-based flushing. + + Flushes all partitions every timeout seconds. + """ + while not self.shutdown: + await asyncio.sleep(self.timeout) + await self.flush_all() + + async def flush_all(self) -> None: + """ + Flush all partition buffers. + + Used for time-based flush and graceful shutdown. + """ + async with self.lock: + for partition_id in self.storage.partitions(): + await self._flush_partition(partition_id) + + async def stop_flusher(self) -> None: + """ + Stop flusher and flush all remaining events. + + Ensures no events are lost on shutdown. + """ + # Signal shutdown + self.shutdown = True + + # Wait for flusher task to complete + if self.flusher_task: + try: + await asyncio.wait_for(self.flusher_task, timeout=2.0) + except TimeoutError: + self.flusher_task.cancel() + + # Final flush + await self.flush_all() diff --git a/src/eventkit/processing/buffer_storage.py b/src/eventkit/processing/buffer_storage.py new file mode 100644 index 0000000..5d1ad30 --- /dev/null +++ b/src/eventkit/processing/buffer_storage.py @@ -0,0 +1,120 @@ +"""Buffer storage backends for pluggable buffer implementations.""" + +from collections import defaultdict +from typing import Protocol + +from eventkit.schema.events import TypedEvent + + +class BufferStorage(Protocol): + """ + Protocol for buffer storage backends. + + Implementations can be in-memory, disk-backed, Redis, or custom. + + Implementations: + - InMemoryBufferStorage (default): Fast, volatile, limited by RAM + - DiskBufferStorage (future): Persistent, larger capacity + - RedisBufferStorage (future): Distributed, shared across processes + + Example: + # Default in-memory + storage = InMemoryBufferStorage() + + # Custom disk-backed (user implementation) + storage = DiskBufferStorage("/tmp/eventkit_buffers") + + # Use with Buffer + buffer = Buffer(event_store=store, storage=storage) + """ + + async def append(self, partition_id: int, event: TypedEvent) -> None: + """ + Add event to partition buffer. + + Args: + partition_id: Partition ID + event: Event to buffer + """ + ... + + async def get_all(self, partition_id: int) -> list[TypedEvent]: + """ + Get all events from partition buffer. + + Args: + partition_id: Partition ID + + Returns: + List of buffered events (may be empty) + """ + ... + + async def clear(self, partition_id: int) -> None: + """ + Clear partition buffer. + + Args: + partition_id: Partition ID + """ + ... + + def len(self, partition_id: int) -> int: + """ + Get partition buffer size. + + Args: + partition_id: Partition ID + + Returns: + Number of events in buffer + """ + ... + + def partitions(self) -> list[int]: + """ + Get all partition IDs with buffered events. + + Returns: + List of partition IDs + """ + ... + + +class InMemoryBufferStorage: + """ + In-memory buffer storage (default). + + Fast but volatile - data lost on crash. + Limited by available RAM. + + Example: + storage = InMemoryBufferStorage() + await storage.append(partition_id=0, event=event) + events = await storage.get_all(partition_id=0) + await storage.clear(partition_id=0) + """ + + def __init__(self) -> None: + """Initialize in-memory storage.""" + self.buffers: dict[int, list[TypedEvent]] = defaultdict(list) + + async def append(self, partition_id: int, event: TypedEvent) -> None: + """Add event to partition buffer.""" + self.buffers[partition_id].append(event) + + async def get_all(self, partition_id: int) -> list[TypedEvent]: + """Get all events from partition buffer.""" + return self.buffers[partition_id] + + async def clear(self, partition_id: int) -> None: + """Clear partition buffer.""" + self.buffers[partition_id].clear() + + def len(self, partition_id: int) -> int: + """Get partition buffer size.""" + return len(self.buffers[partition_id]) + + def partitions(self) -> list[int]: + """Get all partition IDs with buffered events.""" + return list(self.buffers.keys()) diff --git a/tests/unit/processing/test_buffer.py b/tests/unit/processing/test_buffer.py new file mode 100644 index 0000000..5421d1c --- /dev/null +++ b/tests/unit/processing/test_buffer.py @@ -0,0 +1,204 @@ +"""Tests for Buffer.""" + +import asyncio +from unittest.mock import AsyncMock + +import pytest + +from eventkit.errors import BufferFullError +from eventkit.processing.buffer import Buffer +from eventkit.schema.events import IdentifyEvent + + +@pytest.fixture +def mock_event_store(): + """Create a mock EventStore.""" + store = AsyncMock() + store.store_batch = AsyncMock() + return store + + +class TestBufferInit: + """Tests for Buffer initialization.""" + + def test_default_params(self, mock_event_store): + """Test default buffer parameters.""" + buffer = Buffer(event_store=mock_event_store) + assert buffer.size == 100 + assert buffer.max_size == 1000 + assert buffer.timeout == 5.0 + + def test_custom_params(self, mock_event_store): + """Test custom buffer parameters.""" + buffer = Buffer(event_store=mock_event_store, size=50, max_size=500, timeout=10.0) + assert buffer.size == 50 + assert buffer.max_size == 500 + assert buffer.timeout == 10.0 + + +class TestBufferEnqueue: + """Tests for Buffer.enqueue() and size-based flush.""" + + @pytest.mark.asyncio + async def test_basic_enqueue(self, mock_event_store): + """Test enqueueing events without flush.""" + buffer = Buffer(event_store=mock_event_store, size=100) + + event1 = IdentifyEvent(user_id="alice") + event2 = IdentifyEvent(user_id="bob") + + await buffer.enqueue(event1, partition_id=0) + await buffer.enqueue(event2, partition_id=0) + + # Should not have flushed yet (only 2 events) + mock_event_store.store_batch.assert_not_called() + assert buffer.storage.len(0) == 2 + + @pytest.mark.asyncio + async def test_size_based_flush(self, mock_event_store): + """Test size-based flush when buffer is full.""" + buffer = Buffer(event_store=mock_event_store, size=3) + + event1 = IdentifyEvent(user_id="user1") + event2 = IdentifyEvent(user_id="user2") + event3 = IdentifyEvent(user_id="user3") + + # Enqueue 3 events (reaches size threshold) + await buffer.enqueue(event1, partition_id=0) + await buffer.enqueue(event2, partition_id=0) + await buffer.enqueue(event3, partition_id=0) + + # Should have flushed + mock_event_store.store_batch.assert_called_once() + assert buffer.storage.len(0) == 0 + + @pytest.mark.asyncio + async def test_per_partition_isolation(self, mock_event_store): + """Test that flushing one partition doesn't affect others.""" + buffer = Buffer(event_store=mock_event_store, size=2) + + event1 = IdentifyEvent(user_id="user1") + event2 = IdentifyEvent(user_id="user2") + event3 = IdentifyEvent(user_id="user3") + + # Enqueue to different partitions + await buffer.enqueue(event1, partition_id=0) + await buffer.enqueue(event2, partition_id=1) + await buffer.enqueue(event3, partition_id=0) + + # Partition 0 should flush (size=2), partition 1 should not + assert buffer.storage.len(0) == 0 # Flushed + assert buffer.storage.len(1) == 1 # Not flushed + mock_event_store.store_batch.assert_called_once() + + +class TestBufferFullError: + """Tests for BufferFullError.""" + + @pytest.mark.asyncio + async def test_buffer_full_error(self, mock_event_store): + """Test that BufferFullError is raised when max_size exceeded and flush fails.""" + # Make flush fail + mock_event_store.store_batch.side_effect = Exception("Firestore down") + + buffer = Buffer(event_store=mock_event_store, size=100, max_size=5) + + # Enqueue 5 events (at limit) + for i in range(5): + await buffer.enqueue(IdentifyEvent(user_id=f"user{i}"), partition_id=0) + + # 6th event should trigger flush attempt, which fails, then raise BufferFullError + with pytest.raises(BufferFullError, match="Partition 0 buffer full"): + await buffer.enqueue(IdentifyEvent(user_id="user6"), partition_id=0) + + @pytest.mark.asyncio + async def test_per_partition_limits(self, mock_event_store): + """Test that each partition has its own max_size limit.""" + buffer = Buffer(event_store=mock_event_store, size=100, max_size=5) + + # Fill partition 0 + for i in range(5): + await buffer.enqueue(IdentifyEvent(user_id=f"user{i}"), partition_id=0) + + # Partition 1 should still accept events + await buffer.enqueue(IdentifyEvent(user_id="alice"), partition_id=1) + assert buffer.storage.len(1) == 1 + + +class TestBufferTimeBasedFlush: + """Tests for time-based flushing.""" + + @pytest.mark.asyncio + async def test_time_based_flush(self, mock_event_store): + """Test that buffer flushes after timeout.""" + buffer = Buffer(event_store=mock_event_store, size=100, timeout=0.1) + + await buffer.start_flusher() + + # Enqueue 1 event (below size threshold) + await buffer.enqueue(IdentifyEvent(user_id="alice"), partition_id=0) + + # Wait for timeout + await asyncio.sleep(0.2) + + # Should have flushed + assert mock_event_store.store_batch.called + assert buffer.storage.len(0) == 0 + + await buffer.stop_flusher() + + @pytest.mark.asyncio + async def test_start_flusher_twice_raises_error(self, mock_event_store): + """Test that starting flusher twice raises RuntimeError.""" + buffer = Buffer(event_store=mock_event_store) + + await buffer.start_flusher() + + with pytest.raises(RuntimeError, match="Flusher already running"): + await buffer.start_flusher() + + await buffer.stop_flusher() + + +class TestBufferShutdown: + """Tests for graceful shutdown.""" + + @pytest.mark.asyncio + async def test_graceful_shutdown_flushes_all_buffers(self, mock_event_store): + """Test that shutdown flushes all buffers.""" + buffer = Buffer(event_store=mock_event_store, size=100) + + await buffer.start_flusher() + + # Enqueue events to multiple partitions (below size threshold) + await buffer.enqueue(IdentifyEvent(user_id="alice"), partition_id=0) + await buffer.enqueue(IdentifyEvent(user_id="bob"), partition_id=1) + + # Shutdown + await buffer.stop_flusher() + + # All buffers should be empty + assert buffer.storage.len(0) == 0 + assert buffer.storage.len(1) == 0 + # Should have called store_batch for each partition + assert mock_event_store.store_batch.call_count == 2 + + +class TestBufferConcurrency: + """Tests for concurrent access.""" + + @pytest.mark.asyncio + async def test_concurrent_enqueue(self, mock_event_store): + """Test that multiple tasks can enqueue safely.""" + buffer = Buffer(event_store=mock_event_store, size=1000) + + # Enqueue 100 events concurrently + tasks = [ + buffer.enqueue(IdentifyEvent(user_id=f"user{i}"), partition_id=i % 16) + for i in range(100) + ] + await asyncio.gather(*tasks) + + # Check all events buffered + total = sum(buffer.storage.len(i) for i in range(16)) + assert total == 100