diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c022b9e..ea16386 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,6 +16,13 @@ repos: types: [python] pass_filenames: false + - id: mypy + name: mypy type check + entry: uv run mypy src/eventkit + language: system + types: [python] + pass_filenames: false + - repo: https://github.com/pre-commit/pre-commit-hooks rev: v5.0.0 hooks: diff --git a/README.md b/README.md index 12770b0..a889acd 100644 --- a/README.md +++ b/README.md @@ -214,24 +214,52 @@ settings = Settings( ## Development -**Run tests:** +**Setup:** ```bash -pytest +# Install dependencies +uv sync --all-extras + +# Install pre-commit hooks (one-time setup) +uv run pre-commit install +``` + +**Run unit tests (fast, no Docker):** +```bash +uv run pytest +``` + +**Run integration tests (requires Docker):** +```bash +# Start Firestore emulator +docker compose up -d + +# Run integration tests +export FIRESTORE_EMULATOR_HOST=localhost:8080 +uv run pytest -m integration + +# Stop emulator +docker compose down +``` + +**Run all tests:** +```bash +export FIRESTORE_EMULATOR_HOST=localhost:8080 +uv run pytest -m "" ``` **Type check:** ```bash -mypy src/eventkit +uv run mypy src/eventkit ``` **Lint:** ```bash -ruff check src/ +uv run ruff check src/ ``` **Format:** ```bash -ruff format src/ +uv run ruff format src/ ``` ## Roadmap diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..04cd1ba --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,17 @@ +# Docker Compose for local development and testing +# Runs Firestore emulator for integration tests + +services: + firestore-emulator: + image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators + command: gcloud emulators firestore start --host-port=0.0.0.0:8080 + ports: + - "8080:8080" + environment: + - FIRESTORE_PROJECT_ID=test-project + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080"] + interval: 5s + timeout: 5s + retries: 10 + start_period: 10s diff --git a/pyproject.toml b/pyproject.toml index 32e4e2d..c415d23 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "uvicorn[standard]>=0.24.0", "google-cloud-firestore>=2.13.0", "structlog>=23.2.0", + "tenacity>=8.2.0", ] [project.optional-dependencies] diff --git a/specs/core-pipeline/tasks.md b/specs/core-pipeline/tasks.md index d1d3c6c..c26f3e8 100644 --- a/specs/core-pipeline/tasks.md +++ b/specs/core-pipeline/tasks.md @@ -213,13 +213,13 @@ class ErrorStore(Protocol): Implement EventStore and ErrorStore using Firestore with batch operations and retry logic. #### Acceptance Criteria -- [ ] FirestoreEventStore implements EventStore Protocol -- [ ] Batch writes (max 500 docs per batch) -- [ ] Document ID: `{stream}/{timestamp}_{uuid}` -- [ ] Retry logic for transient errors -- [ ] FirestoreErrorStore implements ErrorStore Protocol -- [ ] Unit tests with mocked Firestore client -- [ ] Integration tests with Firestore emulator +- [x] FirestoreEventStore implements EventStore Protocol +- [x] Batch writes (max 500 docs per batch) +- [x] Document ID: `{stream}/{timestamp}_{uuid}` +- [x] Retry logic for transient errors +- [x] FirestoreErrorStore implements ErrorStore Protocol +- [x] Unit tests with mocked Firestore client +- [x] Integration tests with Firestore emulator #### Checklist ```python diff --git a/src/eventkit/schema/events.py b/src/eventkit/schema/events.py index 626123b..37f26d0 100644 --- a/src/eventkit/schema/events.py +++ b/src/eventkit/schema/events.py @@ -38,6 +38,7 @@ class TypedEvent(BaseModel): timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC)) event_type: str properties: dict[str, Any] = Field(default_factory=dict) + stream: str | None = None # Stream identifier for routing # PostHog-style property updates set: dict[str, Any] | None = Field(default=None, alias="$set") diff --git a/src/eventkit/stores/firestore.py b/src/eventkit/stores/firestore.py new file mode 100644 index 0000000..ff7878a --- /dev/null +++ b/src/eventkit/stores/firestore.py @@ -0,0 +1,349 @@ +""" +Firestore implementation of event and error storage. + +Provides concrete implementations of EventStore and ErrorStore protocols +using Google Cloud Firestore, with batching, retry logic, and stream isolation. +""" + +import asyncio +from datetime import datetime +from typing import Any + +from google.api_core.exceptions import ( + DeadlineExceeded, + InternalServerError, + ServiceUnavailable, +) +from google.cloud import firestore +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, +) + +from eventkit.schema.events import TypedEvent + + +class FirestoreEventStore: + """ + Firestore-backed event storage with batch operations and retry logic. + + Stores events in subcollections per stream for isolation: + events/{stream}/items/{timestamp_iso}_{uuid} + + Features: + - Async wrapper around sync Firestore client + - Automatic batching for >500 events + - Exponential backoff retries for transient errors + - Stream-based isolation + + Args: + project_id: GCP project ID + database: Firestore database name (default: "default") + + Example: + store = FirestoreEventStore(project_id="my-project") + await store.store(event) + events = await store.read(stream="mobile", limit=100) + """ + + def __init__(self, project_id: str, database: str = "default"): + """Initialize Firestore client.""" + self.db = firestore.Client(project=project_id, database=database) + + def _get_doc_ref(self, event: TypedEvent) -> firestore.DocumentReference: + """ + Get document reference for an event in its stream subcollection. + + Document path: events/{stream}/items/{timestamp_iso}_{event_id} + + Args: + event: The event to get a reference for + + Returns: + Firestore document reference + """ + stream = getattr(event, "stream", None) or "default" + doc_id = f"{event.timestamp.isoformat()}_{event.event_id}" + return self.db.collection("events").document(stream).collection("items").document(doc_id) + + def _event_to_dict(self, event: TypedEvent) -> dict[str, Any]: + """ + Serialize event to Firestore-compatible dict. + + Converts datetime objects to ISO format strings for Firestore storage. + + Args: + event: The event to serialize + + Returns: + Dictionary representation suitable for Firestore + """ + data = event.model_dump() + # Convert datetime to ISO string + if "timestamp" in data and isinstance(data["timestamp"], datetime): + data["timestamp"] = data["timestamp"].isoformat() + return data + + async def store(self, event: TypedEvent) -> None: + """ + Store a single typed event. + + Wraps the synchronous Firestore write in a thread to avoid blocking + the async event loop. + + Args: + event: The event to store + + Raises: + StorageError: If the event cannot be stored after retries + """ + await asyncio.to_thread(self._sync_store, event) + + def _sync_store(self, event: TypedEvent) -> None: + """Synchronous event storage with retry logic.""" + doc_ref = self._get_doc_ref(event) + data = self._event_to_dict(event) + self._write_with_retry(doc_ref, data) + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type((DeadlineExceeded, ServiceUnavailable, InternalServerError)), + ) + def _write_with_retry(self, doc_ref: firestore.DocumentReference, data: dict[str, Any]) -> None: + """Write document with exponential backoff retry.""" + doc_ref.set(data) + + async def store_batch(self, events: list[TypedEvent]) -> None: + """ + Store a batch of typed events. + + Automatically chunks into groups of 500 (Firestore batch limit). + Each chunk is committed atomically, but chunks are NOT atomic across + each other. + + Args: + events: List of events to store + + Raises: + StorageError: If any batch cannot be stored after retries + """ + await asyncio.to_thread(self._sync_store_batch, events) + + def _sync_store_batch(self, events: list[TypedEvent]) -> None: + """Synchronous batch storage with chunking and retry logic.""" + # Chunk events into groups of 500 + chunks = [events[i : i + 500] for i in range(0, len(events), 500)] + + for chunk in chunks: + batch = self.db.batch() + for event in chunk: + doc_ref = self._get_doc_ref(event) + data = self._event_to_dict(event) + batch.set(doc_ref, data) + + self._write_batch_with_retry(batch) + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type((DeadlineExceeded, ServiceUnavailable, InternalServerError)), + ) + def _write_batch_with_retry(self, batch: firestore.WriteBatch) -> None: + """Commit batch with exponential backoff retry.""" + batch.commit() + + async def read(self, stream: str, limit: int = 100) -> list[TypedEvent]: + """ + Read events from a specific stream. + + Returns events in reverse chronological order (newest first). + + Args: + stream: The stream name to read from + limit: Maximum number of events to return (default: 100) + + Returns: + List of TypedEvent objects from the stream + + Raises: + StorageError: If events cannot be read after retries + """ + return await asyncio.to_thread(self._sync_read, stream, limit) + + def _sync_read(self, stream: str, limit: int) -> list[TypedEvent]: + """Synchronous read with retry logic.""" + docs = self._read_with_retry(stream, limit) + return [self._dict_to_event(doc.to_dict()) for doc in docs] + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type((DeadlineExceeded, ServiceUnavailable, InternalServerError)), + ) + def _read_with_retry(self, stream: str, limit: int) -> Any: + """Query Firestore with exponential backoff retry.""" + return ( + self.db.collection("events") + .document(stream) + .collection("items") + .order_by("timestamp", direction=firestore.Query.DESCENDING) + .limit(limit) + .stream() + ) + + def _dict_to_event(self, data: dict[str, Any]) -> TypedEvent: + """ + Deserialize Firestore dict to TypedEvent. + + Converts ISO timestamp strings back to datetime objects. + + Args: + data: Dictionary from Firestore + + Returns: + TypedEvent subclass based on event_type + + Raises: + ValueError: If event_type is not recognized + """ + from eventkit.schema.events import IdentifyEvent, PageEvent, TrackEvent + + # Convert ISO string back to datetime + if "timestamp" in data and isinstance(data["timestamp"], str): + data["timestamp"] = datetime.fromisoformat(data["timestamp"]) + + # Map event_type to concrete class + event_type = data.get("event_type") + if event_type == "identify": + return IdentifyEvent(**data) + elif event_type == "track": + return TrackEvent(**data) + elif event_type == "page": + return PageEvent(**data) + else: + raise ValueError(f"Unknown event_type: {event_type}") + + +class FirestoreErrorStore: + """ + Firestore-backed error storage for dead letter queue. + + Stores failed events and their error details in a flat collection: + errors/{timestamp_iso}_{uuid} + + Features: + - Async wrapper around sync Firestore client + - Captures raw payload, error message, timestamp, and metadata + - Simple flat structure for easy querying + + Args: + project_id: GCP project ID + database: Firestore database name (default: "default") + + Example: + store = FirestoreErrorStore(project_id="my-project") + await store.store_error( + payload={"userId": "123"}, + error="Validation failed", + timestamp=datetime.now(UTC), + metadata={"stream": "mobile"} + ) + errors = await store.query_errors(limit=10) + """ + + def __init__(self, project_id: str, database: str = "default"): + """Initialize Firestore client.""" + self.db = firestore.Client(project=project_id, database=database) + + async def store_error( + self, + payload: dict[str, Any], + error: str, + timestamp: datetime, + metadata: dict[str, Any] | None = None, + ) -> None: + """ + Store a failed event with error details. + + Args: + payload: The original raw event payload that failed + error: Error message describing why the event failed + timestamp: When the error occurred + metadata: Optional additional context (e.g., stream, source) + + Raises: + StorageError: If the error cannot be stored after retries + """ + await asyncio.to_thread(self._sync_store_error, payload, error, timestamp, metadata) + + def _sync_store_error( + self, + payload: dict[str, Any], + error: str, + timestamp: datetime, + metadata: dict[str, Any] | None, + ) -> None: + """Synchronous error storage with retry logic.""" + from uuid import uuid4 + + doc_id = f"{timestamp.isoformat()}_{uuid4()}" + doc_data = { + "payload": payload, + "error": error, + "timestamp": timestamp.isoformat(), + "stream": metadata.get("stream") if metadata else None, + "metadata": metadata or {}, + } + + doc_ref = self.db.collection("errors").document(doc_id) + self._write_error_with_retry(doc_ref, doc_data) + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type((DeadlineExceeded, ServiceUnavailable, InternalServerError)), + ) + def _write_error_with_retry( + self, doc_ref: firestore.DocumentReference, data: dict[str, Any] + ) -> None: + """Write error document with exponential backoff retry.""" + doc_ref.set(data) + + async def query_errors(self, limit: int = 100) -> list[dict[str, Any]]: + """ + Query stored errors. + + Returns errors in reverse chronological order (newest first). + + Args: + limit: Maximum number of errors to return (default: 100) + + Returns: + List of error documents with payload, error, timestamp, and metadata + + Raises: + StorageError: If errors cannot be queried after retries + """ + return await asyncio.to_thread(self._sync_query_errors, limit) + + def _sync_query_errors(self, limit: int) -> list[dict[str, Any]]: + """Synchronous error query with retry logic.""" + docs = self._query_errors_with_retry(limit) + return [doc.to_dict() for doc in docs] + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type((DeadlineExceeded, ServiceUnavailable, InternalServerError)), + ) + def _query_errors_with_retry(self, limit: int) -> Any: + """Query Firestore errors collection with exponential backoff retry.""" + return ( + self.db.collection("errors") + .order_by("timestamp", direction=firestore.Query.DESCENDING) + .limit(limit) + .stream() + ) diff --git a/tests/integration/test_firestore_integration.py b/tests/integration/test_firestore_integration.py new file mode 100644 index 0000000..5cd9f79 --- /dev/null +++ b/tests/integration/test_firestore_integration.py @@ -0,0 +1,135 @@ +"""Integration tests for Firestore storage using emulator.""" + +import os +from datetime import UTC, datetime + +import pytest + +from eventkit.schema.events import TrackEvent +from eventkit.stores.firestore import FirestoreErrorStore, FirestoreEventStore + + +@pytest.fixture +def check_emulator(): + """Ensure Firestore emulator is running.""" + emulator_host = os.getenv("FIRESTORE_EMULATOR_HOST") + if not emulator_host: + pytest.skip("FIRESTORE_EMULATOR_HOST not set. Start with: docker compose up") + + +@pytest.fixture +async def event_store(check_emulator): + """Fixture for FirestoreEventStore connected to emulator.""" + return FirestoreEventStore(project_id="test-project") + + +@pytest.fixture +async def error_store(check_emulator): + """Fixture for FirestoreErrorStore connected to emulator.""" + return FirestoreErrorStore(project_id="test-project") + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_write_and_read_events(event_store): + """Test end-to-end: write 100 events, read them back.""" + stream = "mobile-integration-test" + + # Create 100 track events + events = [ + TrackEvent( + user_id=f"user{i}", + event_name=f"test_event_{i}", + properties={"index": i}, + stream=stream, + ) + for i in range(100) + ] + + # Write batch + await event_store.store_batch(events) + + # Read back + retrieved_events = await event_store.read(stream=stream, limit=100) + + # Verify + assert len(retrieved_events) == 100 + + # Should be in reverse chronological order (newest first) + # Since they were created in the same batch, order may vary + # but all should be present + retrieved_user_ids = {e.user_id for e in retrieved_events} + expected_user_ids = {f"user{i}" for i in range(100)} + assert retrieved_user_ids == expected_user_ids + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_stream_isolation(event_store): + """Test that streams are isolated (mobile vs web).""" + mobile_stream = "mobile-isolation-test" + web_stream = "web-isolation-test" + + # Write 50 events to mobile stream + mobile_events = [ + TrackEvent( + user_id=f"mobile_user{i}", + event_name="mobile_event", + stream=mobile_stream, + ) + for i in range(50) + ] + await event_store.store_batch(mobile_events) + + # Write 30 events to web stream + web_events = [ + TrackEvent( + user_id=f"web_user{i}", + event_name="web_event", + stream=web_stream, + ) + for i in range(30) + ] + await event_store.store_batch(web_events) + + # Query mobile stream - should only get mobile events + mobile_retrieved = await event_store.read(stream=mobile_stream, limit=100) + assert len(mobile_retrieved) == 50 + assert all("mobile_user" in e.user_id for e in mobile_retrieved) + + # Query web stream - should only get web events + web_retrieved = await event_store.read(stream=web_stream, limit=100) + assert len(web_retrieved) == 30 + assert all("web_user" in e.user_id for e in web_retrieved) + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_error_store_persistence(error_store): + """Test error storage and retrieval.""" + # Store 5 failed events with different errors + for i in range(5): + payload = {"userId": f"user{i}", "type": "track"} + error_msg = f"Validation error {i}: missing required field" + timestamp = datetime.now(UTC) + metadata = {"stream": f"stream{i}"} + + await error_store.store_error(payload, error_msg, timestamp, metadata) + + # Query errors + errors = await error_store.query_errors(limit=10) + + # Verify + assert len(errors) >= 5 # May have errors from other tests + + # Find our errors + our_errors = [e for e in errors if "Validation error" in e["error"]] + assert len(our_errors) == 5 + + # Verify structure + for error in our_errors: + assert "payload" in error + assert "error" in error + assert "timestamp" in error + assert "stream" in error + assert error["payload"]["type"] == "track" diff --git a/tests/unit/stores/test_firestore.py b/tests/unit/stores/test_firestore.py new file mode 100644 index 0000000..f918488 --- /dev/null +++ b/tests/unit/stores/test_firestore.py @@ -0,0 +1,520 @@ +"""Unit tests for Firestore storage implementation.""" + +from datetime import UTC, datetime +from unittest.mock import Mock, patch + +import pytest +from google.cloud import firestore + +from eventkit.schema.events import TrackEvent +from eventkit.stores.firestore import FirestoreErrorStore, FirestoreEventStore + + +def _mock_firestore_doc_ref(mock_client_class, mock_doc_ref): + """Helper to set up deeply nested mock chain for Firestore document refs.""" + ( + mock_client_class.return_value.collection.return_value.document.return_value.collection.return_value.document.return_value + ) = mock_doc_ref + + +class TestFirestoreEventStoreSetup: + """Tests for FirestoreEventStore initialization and helper methods.""" + + @patch("eventkit.stores.firestore.firestore.Client") + def test_init_creates_client(self, mock_client_class): + """FirestoreEventStore should initialize Firestore client.""" + store = FirestoreEventStore(project_id="test-project") + + mock_client_class.assert_called_once_with(project="test-project", database="default") + assert store.db == mock_client_class.return_value + + @patch("eventkit.stores.firestore.firestore.Client") + def test_init_with_custom_database(self, mock_client_class): + """FirestoreEventStore should accept custom database name.""" + FirestoreEventStore(project_id="test-project", database="custom-db") + + mock_client_class.assert_called_once_with(project="test-project", database="custom-db") + + +class TestFirestoreHelperMethods: + """Tests for serialization and path construction helpers.""" + + @patch("eventkit.stores.firestore.firestore.Client") + def test_get_doc_ref_constructs_correct_path(self, mock_client_class): + """_get_doc_ref should construct events/{stream}/items/{id} path.""" + store = FirestoreEventStore(project_id="test") + event = TrackEvent( + user_id="user123", + event_name="test_event", + event_id="event-123", + timestamp=datetime(2026, 1, 10, 12, 0, 0, tzinfo=UTC), + ) + # Add stream attribute + event.stream = "mobile" + + mock_db = mock_client_class.return_value + mock_collection = Mock() + mock_stream_doc = Mock() + mock_items_collection = Mock() + mock_doc = Mock() + + mock_db.collection.return_value = mock_collection + mock_collection.document.return_value = mock_stream_doc + mock_stream_doc.collection.return_value = mock_items_collection + mock_items_collection.document.return_value = mock_doc + + result = store._get_doc_ref(event) + + # Verify correct path construction + mock_db.collection.assert_called_once_with("events") + mock_collection.document.assert_called_once_with("mobile") + mock_stream_doc.collection.assert_called_once_with("items") + expected_doc_id = "2026-01-10T12:00:00+00:00_event-123" + mock_items_collection.document.assert_called_once_with(expected_doc_id) + assert result == mock_doc + + @patch("eventkit.stores.firestore.firestore.Client") + def test_get_doc_ref_uses_default_stream(self, mock_client_class): + """_get_doc_ref should use 'default' stream if none specified.""" + store = FirestoreEventStore(project_id="test") + event = TrackEvent( + user_id="user123", + event_name="test_event", + ) + # No stream attribute + + mock_db = mock_client_class.return_value + mock_collection = Mock() + mock_stream_doc = Mock() + + mock_db.collection.return_value = mock_collection + mock_collection.document.return_value = mock_stream_doc + mock_stream_doc.collection.return_value = Mock() + + store._get_doc_ref(event) + + # Should use "default" stream + mock_collection.document.assert_called_once_with("default") + + @patch("eventkit.stores.firestore.firestore.Client") + def test_event_to_dict_serializes_correctly(self, mock_client_class): + """_event_to_dict should convert event to Firestore-compatible dict.""" + store = FirestoreEventStore(project_id="test") + event = TrackEvent( + user_id="user123", + event_name="Button Clicked", + properties={"button_id": "submit"}, + timestamp=datetime(2026, 1, 10, 12, 0, 0, tzinfo=UTC), + ) + + result = store._event_to_dict(event) + + # Check timestamp is ISO string + assert isinstance(result["timestamp"], str) + assert result["timestamp"] == "2026-01-10T12:00:00+00:00" + + # Check other fields preserved + assert result["user_id"] == "user123" + assert result["event_name"] == "Button Clicked" + assert result["event_type"] == "track" + assert result["properties"] == {"button_id": "submit"} + + @patch("eventkit.stores.firestore.firestore.Client") + def test_event_to_dict_handles_none_user_id(self, mock_client_class): + """_event_to_dict should handle events with no user_id.""" + store = FirestoreEventStore(project_id="test") + event = TrackEvent( + event_name="Anonymous Event", + ) + + result = store._event_to_dict(event) + + assert result["user_id"] is None + assert result["event_name"] == "Anonymous Event" + + +class TestFirestoreEventStoreWrite: + """Tests for write operations (store, store_batch).""" + + @pytest.mark.asyncio + @patch("eventkit.stores.firestore.firestore.Client") + async def test_store_writes_single_event(self, mock_client_class): + """store() should write event to correct path.""" + store = FirestoreEventStore(project_id="test") + event = TrackEvent( + user_id="user123", + event_name="test_event", + stream="mobile", + ) + + # Mock the document reference chain + mock_doc_ref = Mock() + _mock_firestore_doc_ref(mock_client_class, mock_doc_ref) + + await store.store(event) + + # Verify set was called + mock_doc_ref.set.assert_called_once() + call_args = mock_doc_ref.set.call_args[0][0] + assert call_args["user_id"] == "user123" + assert call_args["event_name"] == "test_event" + + @pytest.mark.asyncio + @patch("eventkit.stores.firestore.firestore.Client") + async def test_store_batch_chunks_correctly(self, mock_client_class): + """store_batch() should chunk >500 events into multiple batches.""" + store = FirestoreEventStore(project_id="test") + + # Create 501 events (should create 2 batches: 500 + 1) + events = [TrackEvent(user_id=f"user{i}", event_name=f"event{i}") for i in range(501)] + + mock_batch = Mock() + mock_client_class.return_value.batch.return_value = mock_batch + + # Mock document reference + mock_doc_ref = Mock() + _mock_firestore_doc_ref(mock_client_class, mock_doc_ref) + + await store.store_batch(events) + + # Should have created 2 batches + assert mock_client_class.return_value.batch.call_count == 2 + + # Should have committed 2 batches + assert mock_batch.commit.call_count == 2 + + @pytest.mark.asyncio + @patch("eventkit.stores.firestore.firestore.Client") + async def test_store_batch_calls_set_for_each_event(self, mock_client_class): + """store_batch() should call batch.set() for each event.""" + store = FirestoreEventStore(project_id="test") + events = [ + TrackEvent(user_id="user1", event_name="event1"), + TrackEvent(user_id="user2", event_name="event2"), + ] + + mock_batch = Mock() + mock_client_class.return_value.batch.return_value = mock_batch + + mock_doc_ref = Mock() + _mock_firestore_doc_ref(mock_client_class, mock_doc_ref) + + await store.store_batch(events) + + # Should call batch.set() twice (once per event) + assert mock_batch.set.call_count == 2 + # Should commit once (only 2 events) + mock_batch.commit.assert_called_once() + + +class TestFirestoreRetryLogic: + """Tests for retry logic with transient errors.""" + + @pytest.mark.asyncio + @patch("eventkit.stores.firestore.firestore.Client") + async def test_retry_on_deadline_exceeded(self, mock_client_class): + """_write_with_retry should retry on DeadlineExceeded.""" + from google.api_core.exceptions import DeadlineExceeded + + store = FirestoreEventStore(project_id="test") + event = TrackEvent(user_id="user1", event_name="event1") + + mock_doc_ref = Mock() + _mock_firestore_doc_ref(mock_client_class, mock_doc_ref) + + # Fail twice, then succeed + mock_doc_ref.set.side_effect = [ + DeadlineExceeded("timeout"), + DeadlineExceeded("timeout"), + None, # Success + ] + + await store.store(event) + + # Should have retried 3 times total + assert mock_doc_ref.set.call_count == 3 + + @pytest.mark.asyncio + @patch("eventkit.stores.firestore.firestore.Client") + async def test_retry_exhaustion_raises_error(self, mock_client_class): + """_write_with_retry should raise after 3 failed attempts.""" + from google.api_core.exceptions import ServiceUnavailable + from tenacity import RetryError + + store = FirestoreEventStore(project_id="test") + event = TrackEvent(user_id="user1", event_name="event1") + + mock_doc_ref = Mock() + _mock_firestore_doc_ref(mock_client_class, mock_doc_ref) + + # Always fail + mock_doc_ref.set.side_effect = ServiceUnavailable("service down") + + with pytest.raises(RetryError): + await store.store(event) + + # Should have tried 3 times + assert mock_doc_ref.set.call_count == 3 + + +class TestFirestoreEventStoreRead: + """Tests for read operations.""" + + @pytest.mark.asyncio + @patch("eventkit.stores.firestore.firestore.Client") + async def test_read_returns_events_from_stream(self, mock_client_class): + """read() should query correct stream and deserialize events.""" + store = FirestoreEventStore(project_id="test") + + # Mock Firestore query chain + mock_stream = Mock() + mock_limit = Mock() + mock_order = Mock() + mock_collection = Mock() + mock_doc = Mock() + mock_items = Mock() + + mock_client_class.return_value.collection.return_value = mock_collection + mock_collection.document.return_value = mock_doc + mock_doc.collection.return_value = mock_items + mock_items.order_by.return_value = mock_order + mock_order.limit.return_value = mock_limit + mock_limit.stream.return_value = mock_stream + + # Mock document data + mock_event_doc = Mock() + mock_event_doc.to_dict.return_value = { + "event_id": "evt-1", + "user_id": "user1", + "event_name": "test_event", + "event_type": "track", + "timestamp": "2026-01-10T12:00:00+00:00", + "properties": {}, + "anonymous_id": None, + "stream": None, + } + mock_stream.__iter__ = Mock(return_value=iter([mock_event_doc])) + + events = await store.read(stream="mobile", limit=50) + + # Verify correct path + mock_collection.document.assert_called_once_with("mobile") + mock_items.order_by.assert_called_once_with( + "timestamp", direction=firestore.Query.DESCENDING + ) + mock_order.limit.assert_called_once_with(50) + + # Verify deserialization + assert len(events) == 1 + assert events[0].event_name == "test_event" + assert events[0].user_id == "user1" + + @pytest.mark.asyncio + @patch("eventkit.stores.firestore.firestore.Client") + async def test_read_returns_empty_list_for_no_events(self, mock_client_class): + """read() should return empty list when stream has no events.""" + store = FirestoreEventStore(project_id="test") + + # Mock empty query result + mock_stream = Mock() + mock_limit = Mock() + mock_order = Mock() + mock_collection = Mock() + mock_doc = Mock() + mock_items = Mock() + + mock_client_class.return_value.collection.return_value = mock_collection + mock_collection.document.return_value = mock_doc + mock_doc.collection.return_value = mock_items + mock_items.order_by.return_value = mock_order + mock_order.limit.return_value = mock_limit + mock_limit.stream.return_value = mock_stream + + # Empty stream + mock_stream.__iter__ = Mock(return_value=iter([])) + + events = await store.read(stream="mobile", limit=100) + + assert events == [] + + @pytest.mark.asyncio + @patch("eventkit.stores.firestore.firestore.Client") + async def test_dict_to_event_handles_identify(self, mock_client_class): + """_dict_to_event should deserialize identify events.""" + store = FirestoreEventStore(project_id="test") + + data = { + "event_id": "evt-1", + "user_id": "user1", + "event_type": "identify", + "timestamp": "2026-01-10T12:00:00+00:00", + "traits": {"email": "test@example.com"}, + "properties": {}, + "anonymous_id": None, + "stream": None, + } + + event = store._dict_to_event(data) + + assert event.event_type == "identify" + assert event.user_id == "user1" + assert event.traits == {"email": "test@example.com"} + + @pytest.mark.asyncio + @patch("eventkit.stores.firestore.firestore.Client") + async def test_dict_to_event_handles_page(self, mock_client_class): + """_dict_to_event should deserialize page events.""" + store = FirestoreEventStore(project_id="test") + + data = { + "event_id": "evt-1", + "user_id": "user1", + "event_type": "page", + "name": "Home Page", + "timestamp": "2026-01-10T12:00:00+00:00", + "properties": {}, + "anonymous_id": None, + "stream": None, + } + + event = store._dict_to_event(data) + + assert event.event_type == "page" + assert event.name == "Home Page" + + @pytest.mark.asyncio + @patch("eventkit.stores.firestore.firestore.Client") + async def test_dict_to_event_raises_on_unknown_type(self, mock_client_class): + """_dict_to_event should raise ValueError for unknown event_type.""" + store = FirestoreEventStore(project_id="test") + + data = { + "event_id": "evt-1", + "event_type": "unknown_type", + "timestamp": "2026-01-10T12:00:00+00:00", + } + + with pytest.raises(ValueError, match="Unknown event_type: unknown_type"): + store._dict_to_event(data) + + +class TestFirestoreErrorStore: + """Tests for error storage (dead letter queue).""" + + @pytest.mark.asyncio + @patch("eventkit.stores.firestore.firestore.Client") + async def test_store_error_writes_to_errors_collection(self, mock_client_class): + """store_error() should write error doc to errors collection.""" + store = FirestoreErrorStore(project_id="test") + + mock_doc_ref = Mock() + mock_collection = Mock() + mock_client_class.return_value.collection.return_value = mock_collection + mock_collection.document.return_value = mock_doc_ref + + payload = {"userId": "123", "type": "track"} + error = "Validation failed: missing event name" + timestamp = datetime(2026, 1, 10, 12, 0, 0, tzinfo=UTC) + metadata = {"stream": "mobile"} + + await store.store_error(payload, error, timestamp, metadata) + + # Verify correct collection + mock_collection.document.assert_called_once() + doc_id = mock_collection.document.call_args[0][0] + assert doc_id.startswith("2026-01-10T12:00:00+00:00_") + + # Verify error data structure + mock_doc_ref.set.assert_called_once() + call_data = mock_doc_ref.set.call_args[0][0] + assert call_data["payload"] == payload + assert call_data["error"] == error + assert call_data["stream"] == "mobile" + assert call_data["metadata"] == metadata + + @pytest.mark.asyncio + @patch("eventkit.stores.firestore.firestore.Client") + async def test_store_error_handles_none_metadata(self, mock_client_class): + """store_error() should handle None metadata gracefully.""" + store = FirestoreErrorStore(project_id="test") + + mock_doc_ref = Mock() + mock_collection = Mock() + mock_client_class.return_value.collection.return_value = mock_collection + mock_collection.document.return_value = mock_doc_ref + + payload = {"userId": "123"} + error = "Some error" + timestamp = datetime(2026, 1, 10, 12, 0, 0, tzinfo=UTC) + + await store.store_error(payload, error, timestamp, metadata=None) + + # Verify stream is None and metadata is empty dict + call_data = mock_doc_ref.set.call_args[0][0] + assert call_data["stream"] is None + assert call_data["metadata"] == {} + + @pytest.mark.asyncio + @patch("eventkit.stores.firestore.firestore.Client") + async def test_query_errors_returns_error_list(self, mock_client_class): + """query_errors() should return list of error documents.""" + store = FirestoreErrorStore(project_id="test") + + # Mock query chain + mock_stream = Mock() + mock_limit = Mock() + mock_order = Mock() + mock_collection = Mock() + + mock_client_class.return_value.collection.return_value = mock_collection + mock_collection.order_by.return_value = mock_order + mock_order.limit.return_value = mock_limit + mock_limit.stream.return_value = mock_stream + + # Mock error document + mock_error_doc = Mock() + mock_error_doc.to_dict.return_value = { + "payload": {"userId": "123"}, + "error": "Validation failed", + "timestamp": "2026-01-10T12:00:00+00:00", + "stream": "mobile", + "metadata": {}, + } + mock_stream.__iter__ = Mock(return_value=iter([mock_error_doc])) + + errors = await store.query_errors(limit=50) + + # Verify correct query + mock_collection.order_by.assert_called_once_with( + "timestamp", direction=firestore.Query.DESCENDING + ) + mock_order.limit.assert_called_once_with(50) + + # Verify returned data + assert len(errors) == 1 + assert errors[0]["payload"] == {"userId": "123"} + assert errors[0]["error"] == "Validation failed" + + @pytest.mark.asyncio + @patch("eventkit.stores.firestore.firestore.Client") + async def test_query_errors_returns_empty_list(self, mock_client_class): + """query_errors() should return empty list when no errors.""" + store = FirestoreErrorStore(project_id="test") + + # Mock empty query result + mock_stream = Mock() + mock_limit = Mock() + mock_order = Mock() + mock_collection = Mock() + + mock_client_class.return_value.collection.return_value = mock_collection + mock_collection.order_by.return_value = mock_order + mock_order.limit.return_value = mock_limit + mock_limit.stream.return_value = mock_stream + + # Empty stream + mock_stream.__iter__ = Mock(return_value=iter([])) + + errors = await store.query_errors(limit=100) + + assert errors == [] diff --git a/uv.lock b/uv.lock index 43b9cf9..bf25118 100644 --- a/uv.lock +++ b/uv.lock @@ -257,6 +257,7 @@ dependencies = [ { name = "pydantic" }, { name = "pydantic-settings" }, { name = "structlog" }, + { name = "tenacity" }, { name = "uvicorn", extra = ["standard"] }, ] @@ -287,6 +288,7 @@ requires-dist = [ { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=4.1.0" }, { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.1.0" }, { name = "structlog", specifier = ">=23.2.0" }, + { name = "tenacity", specifier = ">=8.2.0" }, { name = "uvicorn", extras = ["standard"], specifier = ">=0.24.0" }, ] provides-extras = ["dev", "clickhouse"] @@ -981,6 +983,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a8/45/a132b9074aa18e799b891b91ad72133c98d8042c70f6240e4c5f9dabee2f/structlog-25.5.0-py3-none-any.whl", hash = "sha256:a8453e9b9e636ec59bd9e79bbd4a72f025981b3ba0f5837aebf48f02f37a7f9f", size = 72510, upload-time = "2025-10-27T08:28:21.535Z" }, ] +[[package]] +name = "tenacity" +version = "9.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/0a/d4/2b0cd0fe285e14b36db076e78c93766ff1d529d70408bd1d2a5a84f1d929/tenacity-9.1.2.tar.gz", hash = "sha256:1169d376c297e7de388d18b4481760d478b0e99a777cad3a9c86e556f4b697cb", size = 48036, upload-time = "2025-04-02T08:25:09.966Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e5/30/643397144bfbfec6f6ef821f36f33e57d35946c44a2352d3c9f0ae847619/tenacity-9.1.2-py3-none-any.whl", hash = "sha256:f77bf36710d8b73a50b2dd155c97b870017ad21afe6ab300326b0371b3b05138", size = 28248, upload-time = "2025-04-02T08:25:07.678Z" }, +] + [[package]] name = "typing-extensions" version = "4.15.0"