diff --git a/specs/core-pipeline/api.md b/specs/core-pipeline/api.md new file mode 100644 index 0000000..7572d1f --- /dev/null +++ b/specs/core-pipeline/api.md @@ -0,0 +1,152 @@ +# Core Pipeline: API Reference + +**Version**: 1.0 +**Last Updated**: 2025-01-11 + +--- + +## Core Collection Endpoint + +**Endpoint**: `POST /collect` or `POST /collect/{stream}` + +**Path Parameters**: +- `stream` (optional): Stream name (default: "default") + +**Request Body**: Any valid JSON + +**Response**: `202 Accepted` +```json +{ + "message": "Event received", + "data": { + "received": true, + "stream": "users" + } +} +``` + +**Batch Support**: +```json +POST /collect/events +[ + {"type": "track", "event": "Button Clicked"}, + {"type": "track", "event": "Page Viewed"} +] + +Response: {"message": "Events received", "data": {"received": 2}} +``` + +--- + +## Convenience Endpoints (Segment-Compatible) + +| Endpoint | Stream | Purpose | +|----------|--------|---------| +| `POST /v1/identify` | `users` | User identification and traits | +| `POST /v1/track` | `events` | Event tracking | +| `POST /v1/page` | `pages` | Page views | + +All delegate to `POST /collect/{stream}` with appropriate stream. + +--- + +## Health Endpoints + +| Endpoint | Response | Purpose | +|----------|----------|---------| +| `GET /health` | `200 OK` | Liveness check | +| `GET /ready` | `200 OK` or `503 Service Unavailable` | Readiness check (Firestore connectivity) | + +--- + +## Event Formats + +### Identify Event + +```json +POST /v1/identify +{ + "userId": "user_123", + "traits": { + "email": "user@example.com", + "name": "Alice", + "plan": "premium" + }, + "timestamp": "2025-01-11T12:00:00Z" +} +``` + +**Optional Fields**: +- `anonymousId`: Anonymous identifier (fallback if userId missing) +- `$set`: Profile fields to set (PostHog pattern) +- `$set_once`: Profile fields to set only if not already set + +--- + +### Track Event + +```json +POST /v1/track +{ + "userId": "user_123", + "event": "Button Clicked", + "properties": { + "button_id": "cta_primary", + "page": "/home" + }, + "timestamp": "2025-01-11T12:00:00Z" +} +``` + +**Required Fields**: +- `event`: Event name (string) +- `userId` OR `anonymousId`: At least one identity required + +--- + +### Page Event + +```json +POST /v1/page +{ + "userId": "user_123", + "name": "/home", + "properties": { + "title": "Homepage", + "referrer": "https://google.com" + }, + "timestamp": "2025-01-11T12:00:00Z" +} +``` + +--- + +## Error Responses + +### 400 Bad Request + +```json +{ + "error": "Invalid JSON", + "detail": "Expected object or array, got string" +} +``` + +### 503 Service Unavailable + +```json +{ + "error": "Service unavailable", + "detail": "Firestore connection failed" +} +``` + +**Note**: Invalid events are accepted (202) but stored in error queue—they don't return 4xx errors. + +--- + +## Related Documentation + +- **[Specification](./spec.md)** - User stories and requirements +- **[Data Models](./data-models.md)** - Event schemas +- **[Architecture](./architecture.md)** - Component design diff --git a/specs/core-pipeline/architecture.md b/specs/core-pipeline/architecture.md new file mode 100644 index 0000000..d5eac80 --- /dev/null +++ b/specs/core-pipeline/architecture.md @@ -0,0 +1,391 @@ +# Core Pipeline: Architecture + +**Version**: 1.0 +**Last Updated**: 2025-01-11 + +--- + +## Component Overview + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Collection API (FastAPI) │ +│ POST /collect/{stream} │ +└───────────────────────────┬─────────────────────────────────┘ + │ RawEvent + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ Processor │ +│ (Main Orchestrator) │ +└───────────┬──────────────────────────────────┬──────────────┘ + │ │ + ↓ ↓ + ┌───────────────┐ ┌──────────────┐ + │ Adapter │ │ Error Store │ + │ (Validate) │─────────────────→│ (Errors) │ + └───────┬───────┘ Invalid Event └──────────────┘ + │ + │ TypedEvent + ↓ + ┌───────────────┐ + │ Sequencer │ + │ (Hash Route) │ + └───────┬───────┘ + │ partition_id + ↓ + ┌───────────────┐ + │ Buffer │ + │ (Batching) │ + └───────┬───────┘ + │ Batch + ↓ + ┌───────────────┐ + │ Event Store │ + │ (Firestore) │ + └───────────────┘ +``` + +--- + +## 1. Collection API (FastAPI) + +**Responsibility**: Accept HTTP requests, create RawEvents + +```python +@router.post("/collect/{stream}") +async def collect( + request: Request, + stream: str = "default", + processor: Processor = Depends(get_processor) +) -> dict: + payload = await request.json() + raw_event = RawEvent(payload=payload, stream=stream) + await processor.enqueue(raw_event) + return {"message": "Event received", "data": {"received": True}} +``` + +**Design Decision**: Never reject. Always return 202 Accepted. + +--- + +## 2. Adapter (Validation & Normalization) + +**Responsibility**: Convert RawEvent → TypedEvent or AdapterResult.err() + +```python +class SegmentAdapter: + def adapt(self, raw: RawEvent) -> AdapterResult: + event_type = raw.get("type") + + if event_type == "identify": + return self._adapt_identify(raw) + elif event_type == "track": + return self._adapt_track(raw) + else: + return AdapterResult.err(f"Unknown type: {event_type}") +``` + +**Validation Strategy**: Lightweight checks (field presence, types), not strict schema enforcement. + +### Adapter Examples + +**Success Case**: +```python +raw = RawEvent(payload={"type": "identify", "userId": "123"}) +result = adapter.adapt(raw) +# result.success = True +# result.event = IdentifyEvent(userId="123", ...) +``` + +**Error Case**: +```python +raw = RawEvent(payload={"type": "identify"}) # Missing userId +result = adapter.adapt(raw) +# result.success = False +# result.error = "Missing required field: userId or anonymousId" +``` + +--- + +## 3. Sequencer (Hash-Based Routing) + +**Responsibility**: Route events to partitions by identity + +```python +class HashSequencer: + def __init__(self, num_partitions: int = 16): + self.num_partitions = num_partitions + + def get_partition_id(self, event: TypedEvent) -> int: + routing_key = self._get_routing_key(event) + hash_value = hash(routing_key) + return hash_value % self.num_partitions + + def _get_routing_key(self, event: TypedEvent) -> str: + # Use userId or anonymousId for routing + return event.userId or event.anonymousId or "" +``` + +**Algorithm**: Python's built-in `hash()` (fast, good distribution) + +**Example**: +```python +event = IdentifyEvent(userId="alice@example.com", ...) +partition_id = sequencer.get_partition_id(event) +# Always routes to same partition (e.g., 7) +``` + +--- + +## 4. Buffer (Batching) + +**Responsibility**: Batch events before writing to storage + +```python +class Buffer: + def __init__( + self, + event_store: EventStore, + storage: BufferStorage | None = None, + size: int = 100, + max_size: int = 1000, + timeout: float = 5.0 + ): + ... + + async def enqueue(self, event: TypedEvent, partition_id: int): + async with self.lock: + await self.storage.append(partition_id, event) + + if self.storage.len(partition_id) >= self.size: + await self._flush_partition(partition_id) + + async def _flush_partition(self, partition_id: int): + events = await self.storage.get_all(partition_id) + await self.event_store.store_batch(events) + await self.storage.clear(partition_id) +``` + +**Flush Triggers**: +1. **Size threshold** (100 events) - Immediate flush when buffer full +2. **Time threshold** (5 seconds) - Background task flushes periodically +3. **Shutdown** - Graceful flush on stop + +**Concurrency**: Uses `asyncio.Lock` to prevent race conditions during concurrent enqueues. + +--- + +## 5. EventStore (Storage Interface) + +**Responsibility**: Abstract interface for event persistence + +```python +class EventStore(Protocol): + async def store(self, event: TypedEvent) -> None: ... + async def store_batch(self, events: list[TypedEvent]) -> None: ... + async def read(self, stream: str, limit: int = 100) -> list[TypedEvent]: ... + async def query(self, filters: dict) -> list[TypedEvent]: ... +``` + +**Firestore Implementation**: +- Collection: `events` +- Document ID: `{stream}/{timestamp}_{uuid}` +- Batch writes (max 500 per batch) + +--- + +## 6. ErrorStore (Dead Letter Queue) + +**Responsibility**: Store invalid events for debugging + +```python +class ErrorStore(Protocol): + async def write_error( + self, + raw: RawEvent, + error: str, + stream: str | None = None + ) -> None: ... + async def query_errors(self, limit: int = 100) -> list[dict]: ... +``` + +**Firestore Implementation**: +- Collection: `errors` +- Fields: `raw_payload`, `error`, `timestamp`, `stream` + +--- + +## 7. Processor (Main Orchestrator) + +**Responsibility**: Wire all components together + +```python +class Processor: + def __init__( + self, + adapter: EventAdapter, + sequencer: Sequencer, + buffer: Buffer, + error_store: ErrorStore + ): + ... + + async def enqueue(self, raw_event: RawEvent): + # Adapt + result = self.adapter.adapt(raw_event) + + if not result.success: + await self.error_store.write_error(raw_event, result.error) + return + + # Sequence + partition_id = self.sequencer.get_partition_id(result.event) + + # Buffer + await self.buffer.enqueue(result.event, partition_id) +``` + +**Error Handling**: Never raise exceptions. Invalid events → error store. + +**Lifecycle**: +```python +# Startup +await buffer.start_flusher() + +# Process events +await processor.enqueue(raw_event) + +# Shutdown +await buffer.stop_flusher() +``` + +--- + +## Configuration + +### Environment Variables + +| Variable | Required | Default | Description | +|----------|----------|---------|-------------| +| `GCP_PROJECT_ID` | Yes | - | Google Cloud Project ID | +| `FIRESTORE_DATABASE` | No | `"default"` | Firestore database name | +| `BUFFER_SIZE` | No | `100` | Events per buffer before flush | +| `BUFFER_TIMEOUT` | No | `5.0` | Seconds before time-based flush | +| `BUFFER_MAX_SIZE` | No | `1000` | Hard limit per partition (10x buffer size) | +| `NUM_PARTITIONS` | No | `16` | Number of sequencer partitions | +| `EVENTKIT_QUEUE_MODE` | No | `"direct"` | Queue backend: `direct`, `async`, `pubsub` (v0.2.0+) | +| `LOG_LEVEL` | No | `"INFO"` | Logging level (DEBUG, INFO, WARNING, ERROR) | + +**Queue Modes** (future, v0.2.0+): +- `direct` - No queue, inline processing (v0.1.0 default) +- `async` - In-process `asyncio.Queue` for multi-worker single-server +- `pubsub` - GCP Pub/Sub for distributed, horizontally scalable processing + +### Dependency Injection + +```python +# main.py +def get_processor() -> Processor: + event_store = FirestoreEventStore( + project_id=os.getenv("GCP_PROJECT_ID"), + database=os.getenv("FIRESTORE_DATABASE", "default") + ) + + error_store = FirestoreErrorStore(...) + + buffer = Buffer( + event_store=event_store, + size=int(os.getenv("BUFFER_SIZE", "100")), + timeout=float(os.getenv("BUFFER_TIMEOUT", "5.0")) + ) + + sequencer = HashSequencer( + num_partitions=int(os.getenv("NUM_PARTITIONS", "16")) + ) + + adapter = SegmentAdapter() + + return Processor(adapter, sequencer, buffer, error_store) +``` + +--- + +## Pluggable Design + +### BufferStorage Protocol + +The buffer's internal storage is pluggable: + +```python +class BufferStorage(Protocol): + async def append(self, partition_id: int, event: TypedEvent) -> None: ... + async def get_all(self, partition_id: int) -> list[TypedEvent]: ... + async def clear(self, partition_id: int) -> None: ... + def len(self, partition_id: int) -> int: ... + def partitions(self) -> list[int]: ... +``` + +**Implementations**: +- `InMemoryBufferStorage` (default) - Fast, volatile +- `DiskBufferStorage` (future) - Persistent, larger capacity +- `RedisBufferStorage` (future) - Distributed, shared state + +--- + +## Future: Queue Architecture (v0.2.0+) + +For horizontal scaling and decoupled API/worker architecture, `eventkit` will support pluggable queue backends. + +### EventQueue Protocol + +```python +class EventQueue(Protocol): + async def publish(self, event: TypedEvent, partition_id: int) -> None: ... + async def consume(self, partition_id: int) -> TypedEvent: ... +``` + +### Queue Modes + +Configure via `EVENTKIT_QUEUE_MODE` environment variable: + +| Mode | Description | Use Case | Cost | +|------|-------------|----------|------| +| `direct` | No queue, inline processing | v0.1.0 default, single server | Free | +| `async` | In-process `asyncio.Queue` | Multi-worker single server | Free | +| `pubsub` | GCP Pub/Sub | Production, horizontal scaling | Free tier: 10 GB/month | + +### Architecture Evolution + +**v0.1.0 (Current) - DirectQueue**: +``` +API → Processor → Buffer → Storage +``` + +**v0.2.0+ - PubSubQueue**: +``` +API → Pub/Sub → Worker(s) → Buffer → Storage + ↓ + Horizontal scaling +``` + +### Why Pub/Sub? + +1. **Free for learning scale** - 10 GB/month = 333k events/day on free tier +2. **Production-ready** - Same pattern as Lytics, PostHog, RudderStack +3. **Horizontal scaling** - Add workers independently of API servers +4. **Fault tolerance** - Queue persists events during worker downtime +5. **Zero infrastructure** - Fully managed by GCP + +### Implementation Notes + +See detailed architecture diagrams and cost analysis in [queue architecture note](../../notes/projects/eventkit-impl/007-queue-architecture.md). + +For plugin strategy (extracting queue implementations into separate packages), see [queue plugins note](../../notes/projects/eventkit-impl/008-queue-plugins.md). + +--- + +## Related Documentation + +- **[Specification](./spec.md)** - User stories and requirements +- **[API Reference](./api.md)** - Endpoints and formats +- **[Data Models](./data-models.md)** - Event schemas diff --git a/specs/core-pipeline/data-models.md b/specs/core-pipeline/data-models.md new file mode 100644 index 0000000..247b8be --- /dev/null +++ b/specs/core-pipeline/data-models.md @@ -0,0 +1,204 @@ +# Core Pipeline: Data Models + +**Version**: 1.0 +**Last Updated**: 2025-01-11 + +--- + +## RawEvent (Ingestion) + +```python +class RawEvent(BaseModel): + model_config = ConfigDict(extra="allow") # Accept any fields + + payload: dict[str, Any] + received_at: datetime + stream: str | None = None + + def get(self, key: str, default: Any = None) -> Any: + """Helper to extract fields from payload""" +``` + +**Philosophy**: Accept everything, validate nothing. Validation happens downstream. + +**Example**: +```python +raw = RawEvent( + payload={"type": "identify", "userId": "123"}, + received_at=datetime.now(UTC), + stream="users" +) + +user_id = raw.get("userId") # "123" +``` + +--- + +## TypedEvent (Processing) + +Base class for all validated events: + +```python +class TypedEvent(BaseModel): + type: str + timestamp: datetime + userId: str | None = None + anonymousId: str | None = None +``` + +### IdentifyEvent + +```python +class IdentifyEvent(TypedEvent): + type: Literal["identify"] = "identify" + traits: dict[str, Any] + # PostHog patterns for profile updates + set: dict[str, Any] | None = Field(None, alias="$set") + set_once: dict[str, Any] | None = Field(None, alias="$set_once") +``` + +**Example**: +```python +event = IdentifyEvent( + userId="user_123", + timestamp=datetime.now(UTC), + traits={"email": "user@example.com", "plan": "premium"}, + set={"name": "Alice"} +) +``` + +--- + +### TrackEvent + +```python +class TrackEvent(TypedEvent): + type: Literal["track"] = "track" + event_name: str # Event name, e.g., "Button Clicked" + properties: dict[str, Any] +``` + +**Example**: +```python +event = TrackEvent( + userId="user_123", + timestamp=datetime.now(UTC), + event_name="Button Clicked", + properties={"button_id": "cta_primary"} +) +``` + +--- + +### PageEvent + +```python +class PageEvent(TypedEvent): + type: Literal["page"] = "page" + name: str # Page name, e.g., "/home" + properties: dict[str, Any] +``` + +**Example**: +```python +event = PageEvent( + userId="user_123", + timestamp=datetime.now(UTC), + name="/home", + properties={"title": "Homepage"} +) +``` + +--- + +## Ref (Identity) + +```python +class Ref(BaseModel): + field: str # e.g., "userId", "email", "anonymousId" + value: str # e.g., "user_123" + +def extract_refs(event: TypedEvent) -> list[Ref]: + """Extract identity references from event""" + refs = [] + if event.userId: + refs.append(Ref(field="userId", value=event.userId)) + if event.anonymousId: + refs.append(Ref(field="anonymousId", value=event.anonymousId)) + return refs +``` + +**Why Refs Matter**: Enables flexible identity (not just userId) and future identity graph building. + +**Example**: +```python +event = IdentifyEvent(userId="user_123", anonymousId="anon_456", ...) +refs = extract_refs(event) +# [Ref(field="userId", value="user_123"), Ref(field="anonymousId", value="anon_456")] +``` + +--- + +## AdapterResult + +```python +@dataclass +class AdapterResult: + success: bool + event: TypedEvent | None + error: str | None + + @classmethod + def ok(cls, event: TypedEvent) -> "AdapterResult": + return cls(success=True, event=event, error=None) + + @classmethod + def err(cls, error: str) -> "AdapterResult": + return cls(success=False, event=None, error=error) +``` + +**Example**: +```python +# Success case +result = AdapterResult.ok(event) +if result.success: + print(f"Adapted: {result.event.type}") + +# Error case +result = AdapterResult.err("Missing userId") +if not result.success: + await error_store.write(raw_event, result.error) +``` + +--- + +## Normalization Rules + +### Field Name Normalization + +| Input | Output | Reason | +|-------|--------|--------| +| `user_id` | `userId` | Segment standard (camelCase) | +| `anonymous_id` | `anonymousId` | Segment standard | +| `ts` | `timestamp` | Explicit naming | +| `event` | `event_name` | Avoid Python keyword conflict | + +### Timestamp Parsing + +```python +# Accepts: +"2025-01-11T12:00:00Z" # ISO 8601 string +1704848400 # Unix timestamp (seconds) +1704848400000 # Unix timestamp (milliseconds) + +# Outputs: +datetime(2025, 1, 11, 12, 0, 0, tzinfo=UTC) +``` + +--- + +## Related Documentation + +- **[Specification](./spec.md)** - User stories and requirements +- **[API Reference](./api.md)** - Endpoints and formats +- **[Architecture](./architecture.md)** - Component design diff --git a/specs/core-pipeline/plan.md b/specs/core-pipeline/plan.md index 0d9fb5e..9f9ddf3 100644 --- a/specs/core-pipeline/plan.md +++ b/specs/core-pipeline/plan.md @@ -287,7 +287,11 @@ class SegmentAdapter: |------|---------|------------| | `src/eventkit/processing/sequencer.py` | Hash-based routing for consistent ordering | Story 4 | | `src/eventkit/processing/buffer.py` | Batching for efficient writes | Story 5 | -| `src/eventkit/processing/processor.py` | Main orchestrator | All | +| `src/eventkit/processing/buffer_storage.py` | BufferStorage Protocol (pluggable) | Story 5 | +| `src/eventkit/processing/processor.py` | Main orchestrator (queue-agnostic) | All | +| `src/eventkit/queues/base.py` | EventQueue Protocol | Future (v0.2.0+) | +| `src/eventkit/queues/direct.py` | DirectQueue implementation | All | +| `src/eventkit/queues/factory.py` | Queue factory (config-based) | All | **Key Code Patterns**: @@ -377,6 +381,16 @@ class Buffer: ```python # Processor - Wire everything together class Processor: + """ + Main event processor (queue-agnostic). + + Design Decision: Processor doesn't know about queues. + v0.1.0: DirectQueue calls process_event() inline + v0.2.0+: AsyncQueue/PubSubQueue workers call process_event() + + See: notes/projects/eventkit-impl/006-processor-orchestrator.md + """ + def __init__( self, adapter: EventAdapter, @@ -389,14 +403,23 @@ class Processor: self.buffer = buffer self.error_store = error_store - async def enqueue(self, raw_event: RawEvent): - """Process event through the pipeline""" + async def process_event(self, raw_event: RawEvent) -> None: + """ + Core processing logic (queue-agnostic). + + This is the method that queues call. The processor doesn't + know or care which queue is calling it. + """ # Story 3: Adapt result = self.adapter.adapt(raw_event) if not result.success: # Story 6: Invalid events → error store (not exception) - await self.error_store.write_error(raw_event, result.error) + await self.error_store.write_error( + raw_event, + result.error, + raw_event.stream + ) return # Story 4: Sequence (hash-based routing) @@ -404,8 +427,89 @@ class Processor: # Story 5: Buffer (batch processing) await self.buffer.enqueue(result.event, partition_id) + + async def start(self) -> None: + """Start background tasks (buffer flusher).""" + await self.buffer.start_flusher() + + async def stop(self) -> None: + """Graceful shutdown (flush all buffers).""" + await self.buffer.stop_flusher() +``` + +**Queue Architecture (v0.1.0 Foundation)**: + +Even though v0.1.0 only implements DirectQueue, we design for future queue pluggability: + +```python +# src/eventkit/queues/base.py +class EventQueue(Protocol): + """Protocol for queue implementations (all queues must implement).""" + + async def enqueue(self, event: RawEvent) -> None: + """Add event to queue.""" + ... + + async def start(self) -> None: + """Start queue and processor.""" + ... + + async def stop(self) -> None: + """Stop queue gracefully.""" + ... ``` +```python +# src/eventkit/queues/direct.py +class DirectQueue: + """ + Direct (inline) queue - v0.1.0 implementation. + + Processes events immediately without actual queueing. + """ + + def __init__(self, processor: Processor): + self.processor = processor + + async def enqueue(self, event: RawEvent) -> None: + """Process immediately (no queue).""" + await self.processor.process_event(event) + + async def start(self) -> None: + """Start processor background tasks.""" + await self.processor.start() + + async def stop(self) -> None: + """Stop processor gracefully.""" + await self.processor.stop() +``` + +```python +# src/eventkit/queues/factory.py +def create_queue(processor: Processor, settings: Settings) -> EventQueue: + """Factory that creates queue based on config.""" + + if settings.queue_mode == QueueMode.DIRECT: + return DirectQueue(processor) + + # Future: AsyncQueue, PubSubQueue + else: + raise ValueError(f"Unknown queue mode: {settings.queue_mode}") +``` + +**Why This Design**: +1. **Processor is reusable** - Same `process_event()` logic works with all queues +2. **Easy to swap** - Change env var, get different queue +3. **Testable** - Mock EventQueue interface in tests +4. **FastAPI agnostic** - Routes depend on `EventQueue` interface, not implementation + +**Evolution Path**: +- v0.1.0: DirectQueue (inline processing) +- v0.2.0: AsyncQueue (in-process workers with asyncio.Queue) +- v0.3.0: PubSubQueue (distributed workers with GCP Pub/Sub) + +See: `notes/projects/eventkit-impl/007-queue-architecture.md` and `008-queue-plugins.md` + **Testing Strategy**: - Unit tests for Sequencer (consistent routing) - Unit tests for Buffer (size/time-based flushing) @@ -660,7 +764,13 @@ eventkit/ │ │ ├── __init__.py │ │ ├── sequencer.py # Sequencer (Phase 4) │ │ ├── buffer.py # Buffer (Phase 4) +│ │ ├── buffer_storage.py # BufferStorage Protocol (Phase 4) │ │ └── processor.py # Processor (Phase 4) +│ ├── queues/ +│ │ ├── __init__.py +│ │ ├── base.py # EventQueue Protocol (Phase 4) +│ │ ├── direct.py # DirectQueue (Phase 4) +│ │ └── factory.py # Queue factory (Phase 4) │ ├── stores/ │ │ ├── __init__.py │ │ ├── event_store.py # Protocol (Phase 1) diff --git a/specs/core-pipeline/spec.md b/specs/core-pipeline/spec.md index cda55b4..a8e0ca5 100644 --- a/specs/core-pipeline/spec.md +++ b/specs/core-pipeline/spec.md @@ -2,7 +2,7 @@ **Version**: 1.0 **Status**: Planning -**Last Updated**: 2025-01-09 +**Last Updated**: 2025-01-11 --- @@ -269,303 +269,6 @@ readinessProbe: --- -## API Specification - -### Core Collection Endpoint - -**Endpoint**: `POST /collect` or `POST /collect/{stream}` - -**Path Parameters**: -- `stream` (optional): Stream name (default: "default") - -**Request Body**: Any valid JSON - -**Response**: `202 Accepted` -```json -{ - "message": "Event received", - "data": { - "received": true, - "stream": "users" - } -} -``` - -**Batch Support**: -```json -POST /collect/events -[ - {"type": "track", "event": "Button Clicked"}, - {"type": "track", "event": "Page Viewed"} -] - -Response: {"message": "Events received", "data": {"received": 2}} -``` - ---- - -### Convenience Endpoints (Segment-Compatible) - -| Endpoint | Stream | Purpose | -|----------|--------|---------| -| `POST /v1/identify` | `users` | User identification and traits | -| `POST /v1/track` | `events` | Event tracking | -| `POST /v1/page` | `pages` | Page views | - -All delegate to `POST /collect/{stream}` with appropriate stream. - ---- - -### Health Endpoints - -| Endpoint | Response | Purpose | -|----------|----------|---------| -| `GET /health` | `200 OK` | Liveness check | -| `GET /ready` | `200 OK` or `503 Service Unavailable` | Readiness check (Firestore connectivity) | - ---- - -## Data Models - -### RawEvent (Ingestion) - -```python -class RawEvent(BaseModel): - model_config = ConfigDict(extra="allow") # Accept any fields - - payload: dict[str, Any] - received_at: datetime - stream: str | None = None - - def get(self, key: str, default: Any = None) -> Any: - """Helper to extract fields from payload""" -``` - -**Philosophy**: Accept everything, validate nothing. Validation happens downstream. - ---- - -### TypedEvent (Processing) - -Base class for all validated events: - -```python -class TypedEvent(BaseModel): - type: str - timestamp: datetime - userId: str | None = None - anonymousId: str | None = None -``` - -**Concrete Types**: - -```python -class IdentifyEvent(TypedEvent): - type: Literal["identify"] = "identify" - traits: dict[str, Any] - # PostHog patterns for profile updates - set: dict[str, Any] | None = Field(None, alias="$set") - set_once: dict[str, Any] | None = Field(None, alias="$set_once") - -class TrackEvent(TypedEvent): - type: Literal["track"] = "track" - event: str # Event name, e.g., "Button Clicked" - properties: dict[str, Any] - -class PageEvent(TypedEvent): - type: Literal["page"] = "page" - name: str # Page name, e.g., "/home" - properties: dict[str, Any] -``` - ---- - -### Ref (Identity) - -```python -class Ref(BaseModel): - field: str # e.g., "userId", "email", "anonymousId" - value: str # e.g., "user_123" - -def extract_refs(event: TypedEvent) -> list[Ref]: - """Extract identity references from event""" - refs = [] - if event.userId: - refs.append(Ref(field="userId", value=event.userId)) - if event.anonymousId: - refs.append(Ref(field="anonymousId", value=event.anonymousId)) - return refs -``` - -**Why Refs Matter**: Enables flexible identity (not just userId) and future identity graph building. - ---- - -## Architecture Components - -### 1. Collection API (FastAPI) - -**Responsibility**: Accept HTTP requests, create RawEvents - -```python -@router.post("/collect/{stream}") -async def collect( - request: Request, - stream: str = "default", - processor: Processor = Depends(get_processor) -) -> dict: - payload = await request.json() - raw_event = RawEvent(payload=payload, stream=stream) - await processor.enqueue(raw_event) - return {"message": "Event received", "data": {"received": True}} -``` - -**Design Decision**: Never reject. Always return 202 Accepted. - ---- - -### 2. Adapter (Validation & Normalization) - -**Responsibility**: Convert RawEvent → TypedEvent or AdapterResult.err() - -```python -class SegmentAdapter: - def adapt(self, raw: RawEvent) -> AdapterResult: - event_type = raw.get("type") - - if event_type == "identify": - return self._adapt_identify(raw) - elif event_type == "track": - return self._adapt_track(raw) - else: - return AdapterResult.err(f"Unknown type: {event_type}") -``` - -**Validation Strategy**: Lightweight checks (field presence, types), not strict schema enforcement. - ---- - -### 3. Sequencer (Hash-Based Routing) - -**Responsibility**: Route events to partitions by identity - -```python -class Sequencer: - def __init__(self, num_partitions: int = 16): - self.num_partitions = num_partitions - - def get_partition_id(self, event: TypedEvent) -> int: - routing_key = self._get_routing_key(event) - hash_value = fnv1a_hash(routing_key) - return hash_value % self.num_partitions - - def _get_routing_key(self, event: TypedEvent) -> str: - # Use userId or anonymousId for routing - return event.userId or event.anonymousId or hash(event.model_dump_json()) -``` - -**Algorithm**: FNV-1a hash (fast, good distribution) - ---- - -### 4. Buffer (Batching) - -**Responsibility**: Batch events before writing to storage - -```python -class Buffer: - def __init__(self, event_store: EventStore, size: int = 100, timeout: float = 5.0): - self.buffers: dict[int, list[TypedEvent]] = defaultdict(list) - self.size = size - self.timeout = timeout - - async def enqueue(self, event: TypedEvent, partition_id: int): - self.buffers[partition_id].append(event) - - if len(self.buffers[partition_id]) >= self.size: - await self._flush_partition(partition_id) - - async def _flush_partition(self, partition_id: int): - events = self.buffers[partition_id] - await self.event_store.write(events) - self.buffers[partition_id].clear() -``` - -**Flush Triggers**: -1. Size threshold (100 events) -2. Time threshold (5 seconds) -3. Shutdown (graceful flush) - ---- - -### 5. EventStore (Storage Interface) - -**Responsibility**: Abstract interface for event persistence - -```python -class EventStore(Protocol): - async def write(self, events: list[TypedEvent]) -> None: ... - async def read(self, stream: str, limit: int = 100) -> list[TypedEvent]: ... - async def query(self, filters: dict) -> list[TypedEvent]: ... -``` - -**Firestore Implementation**: -- Collection: `events` -- Document ID: `{stream}/{timestamp}_{uuid}` -- Batch writes (max 500 per batch) - ---- - -### 6. ErrorStore (Dead Letter Queue) - -**Responsibility**: Store invalid events for debugging - -```python -class ErrorStore(Protocol): - async def write_error(self, raw: RawEvent, error: str) -> None: ... - async def query_errors(self, limit: int = 100) -> list[dict]: ... -``` - -**Firestore Implementation**: -- Collection: `errors` -- Fields: `raw_payload`, `error`, `timestamp`, `stream` - ---- - -### 7. Processor (Main Orchestrator) - -**Responsibility**: Wire all components together - -```python -class Processor: - def __init__( - self, - adapter: EventAdapter, - sequencer: Sequencer, - buffer: Buffer, - error_store: ErrorStore - ): - ... - - async def enqueue(self, raw_event: RawEvent): - # Adapt - result = self.adapter.adapt(raw_event) - - if not result.success: - await self.error_store.write_error(raw_event, result.error) - return - - # Sequence - partition_id = self.sequencer.get_partition_id(result.event) - - # Buffer - await self.buffer.enqueue(result.event, partition_id) -``` - -**Error Handling**: Never raise exceptions. Invalid events → error store. - ---- - ## Non-Functional Requirements ### Performance @@ -598,21 +301,6 @@ class Processor: --- -## Configuration - -### Environment Variables - -| Variable | Required | Default | Description | -|----------|----------|---------|-------------| -| `GCP_PROJECT_ID` | Yes | - | Google Cloud Project ID | -| `FIRESTORE_DATABASE` | No | `"default"` | Firestore database name | -| `BUFFER_SIZE` | No | `100` | Events per buffer before flush | -| `BUFFER_TIMEOUT` | No | `5.0` | Seconds before time-based flush | -| `NUM_PARTITIONS` | No | `16` | Number of sequencer partitions | -| `LOG_LEVEL` | No | `"INFO"` | Logging level (DEBUG, INFO, WARNING, ERROR) | - ---- - ## Scope Boundaries ### In Scope (v0.1.0) @@ -685,6 +373,14 @@ Production CDP patterns studied: --- +## Additional Documentation + +- **[API Reference](./api.md)** - Endpoints, request/response formats +- **[Data Models](./data-models.md)** - Event schemas and types +- **[Architecture](./architecture.md)** - Component design and configuration + +--- + ## Open Questions 1. **Deduplication**: Should v0.1.0 include event deduplication, or defer to v0.2.0? diff --git a/src/eventkit/adapters/base.py b/src/eventkit/adapters/base.py index 46ed670..bd785af 100644 --- a/src/eventkit/adapters/base.py +++ b/src/eventkit/adapters/base.py @@ -5,11 +5,12 @@ TypedEvent model, decoupling ingestion from processing logic. """ -from typing import Any, Protocol +from typing import Protocol from pydantic import BaseModel from eventkit.schema.events import TypedEvent +from eventkit.schema.raw import RawEvent class AdapterResult(BaseModel): @@ -50,17 +51,17 @@ class EventAdapter(Protocol): Implementation pattern: class SegmentAdapter: - def adapt(self, payload: dict[str, Any]) -> AdapterResult: + def adapt(self, raw: RawEvent) -> AdapterResult: # Convert Segment format to TypedEvent ... """ - def adapt(self, payload: dict[str, Any]) -> AdapterResult: + def adapt(self, raw: RawEvent) -> AdapterResult: """ - Convert a raw payload to a TypedEvent. + Convert a raw event to a TypedEvent. Args: - payload: The raw event payload + raw: The raw event Returns: AdapterResult with converted event or error diff --git a/src/eventkit/config.py b/src/eventkit/config.py index 6e1f7c4..5229e97 100644 --- a/src/eventkit/config.py +++ b/src/eventkit/config.py @@ -5,9 +5,25 @@ type validation and sensible defaults. """ +from enum import Enum + from pydantic_settings import BaseSettings, SettingsConfigDict +class QueueMode(str, Enum): + """ + Queue backend mode for event processing. + + DIRECT: Inline processing (no actual queue) - default + ASYNC: In-process workers with asyncio.Queue + PUBSUB: Distributed workers with GCP Pub/Sub (future) + """ + + DIRECT = "direct" + ASYNC = "async" + PUBSUB = "pubsub" + + class Settings(BaseSettings): """ Application settings loaded from environment variables. @@ -39,5 +55,9 @@ class Settings(BaseSettings): # Sequencer configuration (story 7 - routing) NUM_PARTITIONS: int = 16 # Hash buckets for consistent routing + # Queue configuration + EVENTKIT_QUEUE_MODE: QueueMode = QueueMode.DIRECT # Queue backend mode + EVENTKIT_ASYNC_WORKERS: int = 4 # Number of workers for AsyncQueue mode + # Logging LOG_LEVEL: str = "INFO" diff --git a/src/eventkit/processing/__init__.py b/src/eventkit/processing/__init__.py index 4ac6083..af63135 100644 --- a/src/eventkit/processing/__init__.py +++ b/src/eventkit/processing/__init__.py @@ -1,6 +1,15 @@ """Event processing primitives.""" from eventkit.processing.buffer import Buffer +from eventkit.processing.buffer_storage import BufferStorage, InMemoryBufferStorage +from eventkit.processing.processor import Processor from eventkit.processing.sequencer import HashSequencer, Sequencer -__all__ = ["Buffer", "Sequencer", "HashSequencer"] +__all__ = [ + "Buffer", + "BufferStorage", + "InMemoryBufferStorage", + "Processor", + "Sequencer", + "HashSequencer", +] diff --git a/src/eventkit/processing/processor.py b/src/eventkit/processing/processor.py new file mode 100644 index 0000000..a910d17 --- /dev/null +++ b/src/eventkit/processing/processor.py @@ -0,0 +1,137 @@ +""" +Main Event Processor - Orchestrates the event processing pipeline. + +The Processor is queue-agnostic and handles the core business logic: +1. Adapt: RawEvent → TypedEvent (validation & normalization) +2. Sequence: Assign partition based on identity (consistent routing) +3. Buffer: Enqueue to partition buffer (batch writes) +4. Error Handling: Invalid events → error store (never throw exceptions) + +Design Philosophy: +- Processor doesn't know about queues (queue-agnostic) +- All dependencies injected via constructor (testable) +- Never throws exceptions (fail-fast to error store) +- Lifecycle management (start/stop for graceful shutdown) +""" + +from datetime import UTC, datetime + +from eventkit.adapters.base import EventAdapter +from eventkit.processing.buffer import Buffer +from eventkit.processing.sequencer import Sequencer +from eventkit.schema.raw import RawEvent +from eventkit.stores.error_store import ErrorStore + + +class Processor: + """ + Main event processor (orchestrator). + + Wires together all processing components: + - Adapter: Validates and normalizes events + - Sequencer: Routes events to partitions + - Buffer: Batches events for efficient writes + - ErrorStore: Handles invalid events + + The processor is queue-agnostic - it doesn't know whether it's being + called by DirectQueue (inline), AsyncQueue (workers), or PubSubQueue + (distributed). This separation enables easy scaling. + + Example: + >>> from eventkit.processing.processor import Processor + >>> + >>> processor = Processor( + ... adapter=SegmentSchemaAdapter(), + ... sequencer=HashSequencer(num_partitions=16), + ... buffer=Buffer(event_store, size=100, timeout=5.0), + ... error_store=FirestoreErrorStore(...) + ... ) + >>> + >>> # Lifecycle + >>> await processor.start() + >>> + >>> # Process events (called by queue) + >>> await processor.process_event(raw_event) + >>> + >>> # Shutdown + >>> await processor.stop() + """ + + def __init__( + self, + adapter: EventAdapter, + sequencer: Sequencer, + buffer: Buffer, + error_store: ErrorStore, + ) -> None: + """ + Initialize processor with dependencies. + + Args: + adapter: EventAdapter for validation & normalization + sequencer: Sequencer for consistent partition routing + buffer: Buffer for batching events before writes + error_store: ErrorStore for invalid events + """ + self.adapter = adapter + self.sequencer = sequencer + self.buffer = buffer + self.error_store = error_store + + async def process_event(self, raw_event: RawEvent) -> None: + """ + Process a single event through the pipeline. + + This is the core processing logic that queues call. The processor + doesn't know which queue is calling it (DirectQueue, AsyncQueue, + or PubSubQueue). + + Pipeline: + 1. Adapt: RawEvent → TypedEvent (validation) + 2. Sequence: Assign partition based on identity + 3. Buffer: Enqueue to partition buffer + + Error Handling: + - Invalid events → error_store (not exceptions) + - Never throws exceptions to caller + + Args: + raw_event: RawEvent to process + """ + # Step 1: Adapt (validate & normalize) + result = self.adapter.adapt(raw_event) + + if not result.ok: + # Invalid event → error store + await self.error_store.store_error( + payload=raw_event.payload, + error=result.error or "Unknown error", + timestamp=datetime.now(UTC), + metadata={"stream": raw_event.stream}, + ) + return + + # Type narrowing: If ok=True, event must be present + assert result.event is not None, "Adapter returned ok=True but event is None" + + # Step 2: Sequence (consistent routing) + partition_id = self.sequencer.get_partition_id(result.event) + + # Step 3: Buffer (batch writes) + await self.buffer.enqueue(result.event, partition_id) + + async def start(self) -> None: + """ + Start processor background tasks. + + Currently starts the buffer's time-based flusher task. + """ + await self.buffer.start_flusher() + + async def stop(self) -> None: + """ + Stop processor gracefully. + + Ensures all buffered events are flushed before shutdown. + """ + await self.buffer.stop_flusher() diff --git a/src/eventkit/queues/__init__.py b/src/eventkit/queues/__init__.py new file mode 100644 index 0000000..9de7254 --- /dev/null +++ b/src/eventkit/queues/__init__.py @@ -0,0 +1,8 @@ +"""Event queue abstractions for pluggable queue backends.""" + +from eventkit.queues.async_queue import AsyncQueue +from eventkit.queues.base import EventQueue +from eventkit.queues.direct import DirectQueue +from eventkit.queues.factory import create_queue + +__all__ = ["EventQueue", "DirectQueue", "AsyncQueue", "create_queue"] diff --git a/src/eventkit/queues/async_queue.py b/src/eventkit/queues/async_queue.py new file mode 100644 index 0000000..ac2a93c --- /dev/null +++ b/src/eventkit/queues/async_queue.py @@ -0,0 +1,162 @@ +""" +AsyncQueue - In-process event processing with background workers. + +This queue implementation uses asyncio.Queue to decouple API request handling +from event processing. Events are enqueued quickly, then processed by background +workers in parallel. + +Use Case: +- Single-server deployment with concurrent processing +- Decouple API response time from processing time +- Parallel processing with configurable workers + +Trade-offs: +- ✅ Fast API responses (enqueue is O(1)) +- ✅ Parallel processing (multiple workers) +- ✅ Simple: No external dependencies +- ❌ No persistence: Events lost on crash +- ❌ No horizontal scaling: Single process only + +Evolution Path: +- DirectQueue: Inline processing (no queue) +- AsyncQueue: In-process workers (this file) +- PubSubQueue: Distributed workers (future) +""" + +import asyncio +import logging +from typing import TYPE_CHECKING + +from eventkit.schema.raw import RawEvent + +if TYPE_CHECKING: + from eventkit.processing.processor import Processor + +logger = logging.getLogger(__name__) + + +class AsyncQueue: + """ + In-process queue with background workers. + + Events are added to an asyncio.Queue, then processed by N worker tasks + running in the background. This decouples API request handling from + event processing. + + Example: + >>> from eventkit.processing.processor import Processor + >>> from eventkit.queues.async_queue import AsyncQueue + >>> + >>> processor = Processor(adapter, sequencer, buffer, error_store) + >>> queue = AsyncQueue(processor, num_workers=4) + >>> + >>> await queue.start() + >>> await queue.enqueue(raw_event) # Returns immediately + >>> # Workers process in background + >>> await queue.stop() # Drains queue before stopping + """ + + def __init__(self, processor: "Processor", num_workers: int = 4): + """ + Initialize AsyncQueue. + + Args: + processor: The processor that will handle events + num_workers: Number of background worker tasks (default: 4) + """ + self.processor = processor + self.num_workers = num_workers + self.queue: asyncio.Queue[RawEvent] = asyncio.Queue() + self.workers: list[asyncio.Task[None]] = [] + self._stop_event = asyncio.Event() + + async def enqueue(self, event: RawEvent) -> None: + """ + Add event to queue (non-blocking). + + Returns immediately after adding to queue. Background workers + will process the event asynchronously. + + Args: + event: The raw event to process + """ + await self.queue.put(event) + logger.debug(f"Event enqueued: queue_size={self.queue.qsize()}") + + async def start(self) -> None: + """ + Start background workers and processor. + + Creates N worker tasks that continuously pull from the queue + and process events. + """ + # Start processor (starts buffer flusher) + await self.processor.start() + + # Start worker tasks + self._stop_event.clear() + for i in range(self.num_workers): + worker = asyncio.create_task(self._worker(worker_id=i)) + self.workers.append(worker) + + logger.info(f"AsyncQueue started with {self.num_workers} workers") + + async def stop(self) -> None: + """ + Gracefully shutdown: drain queue, stop workers, stop processor. + + 1. Wait for queue to drain (process remaining events) + 2. Signal workers to stop + 3. Wait for workers to finish + 4. Stop processor (flush buffers) + """ + logger.info("AsyncQueue stopping - draining queue") + + # Wait for queue to drain + await self.queue.join() + + # Signal workers to stop + self._stop_event.set() + + # Wait for all workers to finish + await asyncio.gather(*self.workers, return_exceptions=True) + self.workers.clear() + + # Stop processor (flush buffers) + await self.processor.stop() + + logger.info("AsyncQueue stopped") + + async def _worker(self, worker_id: int) -> None: + """ + Background worker that processes events from queue. + + Continuously pulls events from queue and calls processor.process_event() + until stop signal is received. + + Args: + worker_id: ID for logging purposes + """ + logger.debug(f"Worker {worker_id} started") + + while not self._stop_event.is_set(): + try: + # Wait for event with timeout (check stop signal periodically) + event = await asyncio.wait_for(self.queue.get(), timeout=0.1) + + # Process event + try: + await self.processor.process_event(event) + except Exception as e: + logger.error(f"Worker {worker_id} error processing event: {e}") + finally: + # Mark task as done (for queue.join()) + self.queue.task_done() + + except TimeoutError: + # No event available, check stop signal and continue + continue + except Exception as e: + logger.error(f"Worker {worker_id} unexpected error: {e}") + + logger.debug(f"Worker {worker_id} stopped") diff --git a/src/eventkit/queues/base.py b/src/eventkit/queues/base.py new file mode 100644 index 0000000..184c994 --- /dev/null +++ b/src/eventkit/queues/base.py @@ -0,0 +1,80 @@ +""" +EventQueue Protocol - Interface for all queue implementations. + +This protocol defines the contract that all queue backends must implement, +enabling easy swapping between DirectQueue (inline), AsyncQueue (in-process), +and PubSubQueue (distributed). + +Design Philosophy: +- Queue knows about Processor (calls process_event) +- Processor doesn't know about Queue (queue-agnostic) +- Lifecycle management (start/stop) in queue interface +""" + +from typing import Protocol + +from eventkit.schema.raw import RawEvent + + +class EventQueue(Protocol): + """ + Protocol for event queue implementations. + + All queue backends must implement this interface to be pluggable. + + Implementations: + - DirectQueue: Inline processing (no actual queue) + - AsyncQueue: In-process workers with asyncio.Queue + - PubSubQueue: Distributed workers with GCP Pub/Sub (future) + + Example: + >>> # Queue is created by factory based on config + >>> queue = create_queue(processor, settings) + >>> + >>> # Lifecycle + >>> await queue.start() + >>> + >>> # Process events (API calls this) + >>> await queue.enqueue(raw_event) + >>> + >>> # Shutdown + >>> await queue.stop() + """ + + async def enqueue(self, event: RawEvent) -> None: + """ + Add event to queue for processing. + + For DirectQueue: Processes immediately (inline) + For AsyncQueue: Adds to asyncio.Queue + For PubSubQueue: Publishes to GCP Pub/Sub + + Args: + event: RawEvent to process + + Raises: + QueueFullError: If queue is full and cannot accept events + """ + ... + + async def start(self) -> None: + """ + Start queue and processor. + + For DirectQueue: Starts processor buffer flusher + For AsyncQueue: Starts processor + worker tasks + For PubSubQueue: Starts processor + Pub/Sub subscriber + """ + ... + + async def stop(self) -> None: + """ + Stop queue gracefully. + + Ensures all queued events are processed before shutdown. + + For DirectQueue: Stops processor (flushes buffer) + For AsyncQueue: Drains queue, stops workers, stops processor + For PubSubQueue: Waits for in-flight acks, stops processor + """ + ... diff --git a/src/eventkit/queues/direct.py b/src/eventkit/queues/direct.py new file mode 100644 index 0000000..0aaccd7 --- /dev/null +++ b/src/eventkit/queues/direct.py @@ -0,0 +1,86 @@ +""" +DirectQueue - Inline event processing (no actual queue). + +This is the simplest queue implementation. Events are processed +immediately when enqueue() is called, with no queueing or background workers. + +Use Case: +- Single-server deployment +- Development and testing +- Getting started quickly + +Trade-offs: +- ✅ Simple: No external dependencies +- ✅ Fast: No queue overhead +- ❌ No decoupling: API and processing in same thread +- ❌ No horizontal scaling: Can't add more workers + +Evolution Path: +- DirectQueue: Inline processing (this file) +- AsyncQueue: In-process workers with background tasks +- PubSubQueue: Distributed workers via GCP Pub/Sub (future) +""" + +from typing import TYPE_CHECKING + +from eventkit.schema.raw import RawEvent + +if TYPE_CHECKING: + from eventkit.processing.processor import Processor + + +class DirectQueue: + """ + Direct (inline) queue implementation. + + Processes events immediately without actual queueing. The "queue" + is just a thin wrapper that calls processor.process_event() directly. + + Example: + >>> from eventkit.processing.processor import Processor + >>> from eventkit.queues.direct import DirectQueue + >>> + >>> processor = Processor(adapter, sequencer, buffer, error_store) + >>> queue = DirectQueue(processor) + >>> + >>> await queue.start() + >>> await queue.enqueue(raw_event) # Processes immediately + >>> await queue.stop() + """ + + def __init__(self, processor: "Processor") -> None: # noqa: F821 + """ + Initialize DirectQueue. + + Args: + processor: Processor instance that handles event processing + """ + self.processor = processor + + async def enqueue(self, event: RawEvent) -> None: + """ + Process event immediately (inline). + + No actual queueing - just calls processor.process_event() directly. + + Args: + event: RawEvent to process + """ + await self.processor.process_event(event) + + async def start(self) -> None: + """ + Start processor. + + For DirectQueue, this just starts the processor's background tasks + (e.g., buffer flusher). + """ + await self.processor.start() + + async def stop(self) -> None: + """ + Stop processor gracefully. + + Ensures buffer is flushed before shutdown. + """ + await self.processor.stop() diff --git a/src/eventkit/queues/factory.py b/src/eventkit/queues/factory.py new file mode 100644 index 0000000..170ff59 --- /dev/null +++ b/src/eventkit/queues/factory.py @@ -0,0 +1,72 @@ +""" +Queue Factory - Creates queue instances based on configuration. + +This factory enables swapping queue implementations via environment variable +(EVENTKIT_QUEUE_MODE) without changing application code. + +Supported Modes: +- direct: DirectQueue (inline processing) - default +- async: AsyncQueue (in-process workers) +- pubsub: PubSubQueue (distributed workers) - future +""" + +from typing import TYPE_CHECKING + +from eventkit.config import QueueMode, Settings +from eventkit.queues.base import EventQueue +from eventkit.queues.direct import DirectQueue + +if TYPE_CHECKING: + from eventkit.processing.processor import Processor + + +def create_queue(processor: "Processor", settings: Settings) -> EventQueue: + """ + Create queue instance based on settings.queue_mode. + + This factory pattern enables easy swapping of queue implementations + via configuration. The processor doesn't know which queue is being used. + + Args: + processor: Processor instance (queue-agnostic) + settings: Settings with queue_mode configuration + + Returns: + EventQueue implementation (DirectQueue, AsyncQueue, or PubSubQueue) + + Raises: + ValueError: If queue_mode is unknown + + Example: + >>> from eventkit.config import Settings + >>> from eventkit.processing.processor import Processor + >>> from eventkit.queues.factory import create_queue + >>> + >>> settings = Settings() # Reads EVENTKIT_QUEUE_MODE from env + >>> processor = Processor(...) + >>> queue = create_queue(processor, settings) + >>> + >>> await queue.start() + >>> await queue.enqueue(raw_event) + >>> await queue.stop() + """ + + if settings.EVENTKIT_QUEUE_MODE == QueueMode.DIRECT: + return DirectQueue(processor) + + elif settings.EVENTKIT_QUEUE_MODE == QueueMode.ASYNC: + from eventkit.queues.async_queue import AsyncQueue + + return AsyncQueue(processor, num_workers=settings.EVENTKIT_ASYNC_WORKERS) + + elif settings.EVENTKIT_QUEUE_MODE == QueueMode.PUBSUB: + # Future: PubSubQueue implementation + raise NotImplementedError( + "PubSubQueue not yet implemented. Set EVENTKIT_QUEUE_MODE=direct to use DirectQueue." + ) + + else: + raise ValueError( + f"Unknown queue mode: {settings.EVENTKIT_QUEUE_MODE}. " + f"Supported modes: {[mode.value for mode in QueueMode]}" + ) diff --git a/tests/unit/processing/test_processor.py b/tests/unit/processing/test_processor.py new file mode 100644 index 0000000..d914316 --- /dev/null +++ b/tests/unit/processing/test_processor.py @@ -0,0 +1,193 @@ +"""Tests for Processor.""" + +from datetime import UTC, datetime +from unittest.mock import AsyncMock, Mock + +import pytest + +from eventkit.adapters.base import AdapterResult +from eventkit.processing.processor import Processor +from eventkit.schema.events import IdentifyEvent +from eventkit.schema.raw import RawEvent + + +@pytest.mark.asyncio +class TestProcessor: + """Test Processor orchestration.""" + + async def test_process_event_success(self): + """Processor should adapt, sequence, and buffer valid events.""" + # Setup mocks + mock_adapter = Mock() + typed_event = IdentifyEvent( + userId="user123", + timestamp=datetime.now(UTC), + traits={"email": "user@example.com"}, + ) + mock_adapter.adapt = Mock(return_value=AdapterResult.success(typed_event)) + + mock_sequencer = Mock() + mock_sequencer.get_partition_id = Mock(return_value=7) + + mock_buffer = Mock() + mock_buffer.enqueue = AsyncMock() + + mock_error_store = Mock() + + processor = Processor( + adapter=mock_adapter, + sequencer=mock_sequencer, + buffer=mock_buffer, + error_store=mock_error_store, + ) + + raw_event = RawEvent(payload={"type": "identify", "userId": "user123"}) + + # Execute + await processor.process_event(raw_event) + + # Verify + mock_adapter.adapt.assert_called_once_with(raw_event) + mock_sequencer.get_partition_id.assert_called_once_with(typed_event) + mock_buffer.enqueue.assert_called_once_with(typed_event, 7) + mock_error_store.store_error.assert_not_called() + + async def test_process_event_invalid(self): + """Processor should store invalid events in error store.""" + # Setup mocks + mock_adapter = Mock() + mock_adapter.adapt = Mock(return_value=AdapterResult.failure("Missing userId")) + + mock_sequencer = Mock() + mock_buffer = Mock() + + mock_error_store = Mock() + mock_error_store.store_error = AsyncMock() + + processor = Processor( + adapter=mock_adapter, + sequencer=mock_sequencer, + buffer=mock_buffer, + error_store=mock_error_store, + ) + + raw_event = RawEvent(payload={"type": "identify"}, stream="users") + + # Execute + await processor.process_event(raw_event) + + # Verify + mock_adapter.adapt.assert_called_once_with(raw_event) + mock_error_store.store_error.assert_called_once() + + # Verify error store call + call_args = mock_error_store.store_error.call_args + assert call_args.kwargs["payload"] == raw_event.payload + assert call_args.kwargs["error"] == "Missing userId" + assert call_args.kwargs["metadata"] == {"stream": "users"} + assert isinstance(call_args.kwargs["timestamp"], datetime) + + # Verify sequencer and buffer NOT called + mock_sequencer.get_partition_id.assert_not_called() + mock_buffer.enqueue.assert_not_called() + + async def test_process_event_partitioning(self): + """Processor should route events to correct partition.""" + # Setup mocks + mock_adapter = Mock() + + # Create different typed events + events_and_partitions = [ + (IdentifyEvent(userId="user1", timestamp=datetime.now(UTC), traits={}), 3), + (IdentifyEvent(userId="user2", timestamp=datetime.now(UTC), traits={}), 7), + (IdentifyEvent(userId="user3", timestamp=datetime.now(UTC), traits={}), 12), + ] + + mock_sequencer = Mock() + mock_buffer = Mock() + mock_buffer.enqueue = AsyncMock() + mock_error_store = Mock() + + processor = Processor( + adapter=mock_adapter, + sequencer=mock_sequencer, + buffer=mock_buffer, + error_store=mock_error_store, + ) + + # Execute for each event + for typed_event, partition_id in events_and_partitions: + mock_adapter.adapt = Mock(return_value=AdapterResult.success(typed_event)) + mock_sequencer.get_partition_id = Mock(return_value=partition_id) + + raw_event = RawEvent(payload={"type": "identify", "user_id": typed_event.user_id}) + await processor.process_event(raw_event) + + # Verify all events buffered to correct partitions + assert mock_buffer.enqueue.call_count == 3 + for i, (typed_event, partition_id) in enumerate(events_and_partitions): + call_args = mock_buffer.enqueue.call_args_list[i] + assert call_args[0][0].user_id == typed_event.user_id + assert call_args[0][1] == partition_id + + async def test_start_lifecycle(self): + """Processor.start() should start buffer flusher.""" + # Setup + mock_buffer = Mock() + mock_buffer.start_flusher = AsyncMock() + + processor = Processor( + adapter=Mock(), + sequencer=Mock(), + buffer=mock_buffer, + error_store=Mock(), + ) + + # Execute + await processor.start() + + # Verify + mock_buffer.start_flusher.assert_called_once() + + async def test_stop_lifecycle(self): + """Processor.stop() should stop buffer gracefully.""" + # Setup + mock_buffer = Mock() + mock_buffer.stop_flusher = AsyncMock() + + processor = Processor( + adapter=Mock(), + sequencer=Mock(), + buffer=mock_buffer, + error_store=Mock(), + ) + + # Execute + await processor.stop() + + # Verify + mock_buffer.stop_flusher.assert_called_once() + + async def test_no_exceptions_on_invalid_events(self): + """Processor should never throw exceptions for invalid events.""" + # Setup + mock_adapter = Mock() + mock_adapter.adapt = Mock(return_value=AdapterResult.failure("Invalid")) + + mock_error_store = Mock() + mock_error_store.store_error = AsyncMock() + + processor = Processor( + adapter=mock_adapter, + sequencer=Mock(), + buffer=Mock(), + error_store=mock_error_store, + ) + + raw_event = RawEvent(payload={"bad": "event"}) + + # Execute - should not raise + await processor.process_event(raw_event) + + # Verify error was stored + mock_error_store.store_error.assert_called_once() diff --git a/tests/unit/queues/test_async.py b/tests/unit/queues/test_async.py new file mode 100644 index 0000000..7009c84 --- /dev/null +++ b/tests/unit/queues/test_async.py @@ -0,0 +1,186 @@ +"""Tests for AsyncQueue.""" + +import asyncio +from unittest.mock import AsyncMock, Mock + +import pytest + +from eventkit.queues.async_queue import AsyncQueue +from eventkit.schema.raw import RawEvent + + +@pytest.mark.asyncio +class TestAsyncQueue: + """Test AsyncQueue implementation.""" + + async def test_enqueue_adds_to_queue(self): + """AsyncQueue.enqueue() should add event to internal queue.""" + # Setup + mock_processor = Mock() + queue = AsyncQueue(mock_processor, num_workers=1) + + raw_event = RawEvent(payload={"type": "identify", "userId": "123"}) + + # Execute + await queue.enqueue(raw_event) + + # Verify + assert queue.queue.qsize() == 1 + queued_event = await queue.queue.get() + assert queued_event == raw_event + + async def test_workers_process_events(self): + """AsyncQueue workers should process events from queue.""" + # Setup + mock_processor = Mock() + mock_processor.start = AsyncMock() + mock_processor.stop = AsyncMock() + mock_processor.process_event = AsyncMock() + + queue = AsyncQueue(mock_processor, num_workers=2) + await queue.start() + + # Execute - enqueue events + events = [ + RawEvent(payload={"type": "identify", "userId": "user1"}), + RawEvent(payload={"type": "track", "event": "Click"}), + RawEvent(payload={"type": "page", "name": "/home"}), + ] + + for event in events: + await queue.enqueue(event) + + # Wait for processing (give workers time to pick up events) + await asyncio.sleep(0.2) + + # Verify + assert mock_processor.process_event.call_count == 3 + + # Cleanup + await queue.stop() + + async def test_graceful_shutdown_drains_queue(self): + """AsyncQueue.stop() should process all queued events before stopping.""" + # Setup + mock_processor = Mock() + mock_processor.start = AsyncMock() + mock_processor.stop = AsyncMock() + + # Add delay to process_event to simulate work + async def delayed_process(event): + await asyncio.sleep(0.05) + + mock_processor.process_event = AsyncMock(side_effect=delayed_process) + + queue = AsyncQueue(mock_processor, num_workers=1) + await queue.start() + + # Enqueue multiple events + events = [RawEvent(payload={"type": "track", "event": f"Event{i}"}) for i in range(5)] + for event in events: + await queue.enqueue(event) + + # Execute - stop should drain queue + await queue.stop() + + # Verify all events processed + assert mock_processor.process_event.call_count == 5 + assert queue.queue.qsize() == 0 + + async def test_multiple_workers_parallel_processing(self): + """Multiple workers should process events in parallel.""" + # Setup + mock_processor = Mock() + mock_processor.start = AsyncMock() + mock_processor.stop = AsyncMock() + + # Track which worker processed which event (simulate work) + processed_events = [] + + async def track_process(event): + processed_events.append(event) + await asyncio.sleep(0.05) # Simulate processing time + + mock_processor.process_event = AsyncMock(side_effect=track_process) + + # Use 3 workers + queue = AsyncQueue(mock_processor, num_workers=3) + await queue.start() + + # Enqueue 6 events + events = [RawEvent(payload={"type": "track", "event": f"Event{i}"}) for i in range(6)] + for event in events: + await queue.enqueue(event) + + # Wait for processing + await asyncio.sleep(0.3) + + # Verify all events processed + assert len(processed_events) == 6 + + # Cleanup + await queue.stop() + + async def test_start_stops_processor(self): + """AsyncQueue.start() should start processor.""" + # Setup + mock_processor = Mock() + mock_processor.start = AsyncMock() + mock_processor.stop = AsyncMock() + + queue = AsyncQueue(mock_processor, num_workers=1) + + # Execute + await queue.start() + await queue.stop() + + # Verify + mock_processor.start.assert_called_once() + mock_processor.stop.assert_called_once() + + async def test_worker_error_handling(self): + """Workers should continue processing even if one event fails.""" + # Setup + mock_processor = Mock() + mock_processor.start = AsyncMock() + mock_processor.stop = AsyncMock() + + # First event fails, rest succeed + call_count = 0 + + async def process_with_error(event): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise ValueError("Processing error") + + mock_processor.process_event = AsyncMock(side_effect=process_with_error) + + queue = AsyncQueue(mock_processor, num_workers=1) + await queue.start() + + # Enqueue events (first will fail, second should still process) + await queue.enqueue(RawEvent(payload={"type": "track", "event": "Event1"})) + await queue.enqueue(RawEvent(payload={"type": "track", "event": "Event2"})) + + # Wait for processing with longer timeout + await asyncio.sleep(0.5) + + # Verify both events were attempted + assert mock_processor.process_event.call_count == 2 + + # Cleanup + await queue.stop() + + async def test_queue_size_tracking(self): + """Queue size should be trackable.""" + # Setup + mock_processor = Mock() + queue = AsyncQueue(mock_processor, num_workers=1) + + # Enqueue events + for i in range(3): + await queue.enqueue(RawEvent(payload={"type": "track", "event": f"Event{i}"})) + + # Verify queue size + assert queue.queue.qsize() == 3 diff --git a/tests/unit/queues/test_direct.py b/tests/unit/queues/test_direct.py new file mode 100644 index 0000000..1e36e58 --- /dev/null +++ b/tests/unit/queues/test_direct.py @@ -0,0 +1,76 @@ +"""Tests for DirectQueue.""" + +from unittest.mock import AsyncMock, Mock + +import pytest + +from eventkit.queues.direct import DirectQueue +from eventkit.schema.raw import RawEvent + + +@pytest.mark.asyncio +class TestDirectQueue: + """Test DirectQueue implementation.""" + + async def test_enqueue_calls_processor(self): + """DirectQueue.enqueue() should call processor.process_event().""" + # Setup + mock_processor = Mock() + mock_processor.process_event = AsyncMock() + queue = DirectQueue(mock_processor) + + raw_event = RawEvent(payload={"type": "identify", "userId": "123"}) + + # Execute + await queue.enqueue(raw_event) + + # Verify + mock_processor.process_event.assert_called_once_with(raw_event) + + async def test_start_calls_processor_start(self): + """DirectQueue.start() should call processor.start().""" + # Setup + mock_processor = Mock() + mock_processor.start = AsyncMock() + queue = DirectQueue(mock_processor) + + # Execute + await queue.start() + + # Verify + mock_processor.start.assert_called_once() + + async def test_stop_calls_processor_stop(self): + """DirectQueue.stop() should call processor.stop().""" + # Setup + mock_processor = Mock() + mock_processor.stop = AsyncMock() + queue = DirectQueue(mock_processor) + + # Execute + await queue.stop() + + # Verify + mock_processor.stop.assert_called_once() + + async def test_multiple_enqueues(self): + """DirectQueue should process multiple events in sequence.""" + # Setup + mock_processor = Mock() + mock_processor.process_event = AsyncMock() + queue = DirectQueue(mock_processor) + + events = [ + RawEvent(payload={"type": "identify", "userId": "user1"}), + RawEvent(payload={"type": "track", "event": "Click"}), + RawEvent(payload={"type": "page", "name": "/home"}), + ] + + # Execute + for event in events: + await queue.enqueue(event) + + # Verify + assert mock_processor.process_event.call_count == 3 + for i, event in enumerate(events): + assert mock_processor.process_event.call_args_list[i][0][0] == event diff --git a/tests/unit/queues/test_factory.py b/tests/unit/queues/test_factory.py new file mode 100644 index 0000000..0172f01 --- /dev/null +++ b/tests/unit/queues/test_factory.py @@ -0,0 +1,68 @@ +"""Tests for queue factory.""" + +from unittest.mock import Mock + +import pytest + +from eventkit.config import QueueMode, Settings +from eventkit.queues.async_queue import AsyncQueue +from eventkit.queues.direct import DirectQueue +from eventkit.queues.factory import create_queue + + +class TestQueueFactory: + """Test queue factory.""" + + def test_create_direct_queue(self): + """Factory should create DirectQueue when mode is DIRECT.""" + # Setup + settings = Settings(GCP_PROJECT_ID="test-project", EVENTKIT_QUEUE_MODE=QueueMode.DIRECT) + mock_processor = Mock() + + # Execute + queue = create_queue(mock_processor, settings) + + # Verify + assert isinstance(queue, DirectQueue) + assert queue.processor == mock_processor + + def test_create_async_queue(self): + """Factory should create AsyncQueue when mode is ASYNC.""" + # Setup + settings = Settings( + GCP_PROJECT_ID="test-project", + EVENTKIT_QUEUE_MODE=QueueMode.ASYNC, + EVENTKIT_ASYNC_WORKERS=8, + ) + mock_processor = Mock() + + # Execute + queue = create_queue(mock_processor, settings) + + # Verify + assert isinstance(queue, AsyncQueue) + assert queue.processor == mock_processor + assert queue.num_workers == 8 + + def test_async_queue_default_workers(self): + """Factory should use default worker count if not specified.""" + # Setup + settings = Settings(GCP_PROJECT_ID="test-project", EVENTKIT_QUEUE_MODE=QueueMode.ASYNC) + mock_processor = Mock() + + # Execute + queue = create_queue(mock_processor, settings) + + # Verify + assert isinstance(queue, AsyncQueue) + assert queue.num_workers == 4 # Default from config + + def test_pubsub_queue_not_implemented(self): + """Factory should raise NotImplementedError for PUBSUB mode.""" + # Setup + settings = Settings(GCP_PROJECT_ID="test-project", EVENTKIT_QUEUE_MODE=QueueMode.PUBSUB) + mock_processor = Mock() + + # Execute & Verify + with pytest.raises(NotImplementedError, match="PubSubQueue not yet implemented"): + create_queue(mock_processor, settings)