Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,93 @@ Use [conventional commits](https://www.conventionalcommits.org/):

Keep first line under 72 characters.

## Logging

### Using Structured Logging

Use `structlog.get_logger(__name__)` in all modules:

```python
import structlog

logger = structlog.get_logger(__name__)

# Good: structured key-value pairs
logger.info("event_received", stream="users", event_type="identify")

# Bad: string interpolation
logger.info(f"Received {event_type} event from {stream}") # Don't do this!
```

### What to Log

**Always Log (INFO level)**:
- API requests: `method`, `path`, `status_code`, `duration_ms`
- Events received: `stream`, `event_type` (NOT full payload)
- Buffer flushes: `partition`, `event_count`, `duration_ms`
- Store writes: `event_count`, `duration_ms`

**Sometimes Log (DEBUG level)**:
- Event adaptation details
- Sequencer routing: `event_id`, `partition`
- Worker lifecycle changes

**Always Log (WARN level)**:
- Validation failures: `error_type`, `error` (NOT full event)
- Retries: `operation`, `attempt`, `reason`

**Always Log (ERROR level)**:
- Store failures: `error`, `event_count`
- Ring buffer failures: `error`, `current_size`
- Worker crashes with stack traces (`exc_info=True`)

### What NOT to Log

**Never log**:
- Full event payloads (PII risk)
- User identifiers in plaintext
- Auth tokens or secrets

### Context Propagation

Use `structlog.contextvars` for automatic context in all logs:

```python
import structlog

# At API entry point (request handler)
structlog.contextvars.bind_contextvars(
request_id=str(uuid.uuid4()),
stream=stream,
)

# Anywhere in the call chain (automatically includes request_id and stream)
logger.info("event_adapted", event_type="identify")

# Clear context after request
structlog.contextvars.clear_contextvars()
```

### Performance Timing

Log performance metrics for slow operations:

```python
import time

start_time = time.perf_counter()

# ... operation ...

duration_ms = (time.perf_counter() - start_time) * 1000
logger.info(
"buffer_flush_complete",
partition=partition_id,
event_count=len(events),
duration_ms=round(duration_ms, 2),
)
```

## Critical Patterns

### Queue-Agnostic Processor
Expand Down
74 changes: 74 additions & 0 deletions LOCAL_DEV.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,80 @@ API → ring buffer (durable) → publisher → queue → workers → Firestore

---

## Logging

eventkit uses structured logging via `structlog` for production observability.

### Development (Pretty Logs)

By default, logs are colored and human-readable:

```bash
export EVENTKIT_LOG_LEVEL="DEBUG" # or INFO (default), WARNING, ERROR
export EVENTKIT_JSON_LOGS="false" # default

uv run uvicorn eventkit.api.app:app --reload --port 8000
```

Example output:
```
2026-01-12T10:30:45.123Z [info ] request_received method=POST path=/collect/users event_type=identify request_id=abc-123 stream=users
2026-01-12T10:30:45.125Z [debug ] event_received event_type=identify stream=users
2026-01-12T10:30:45.126Z [debug ] event_adapted event_type=identify
2026-01-12T10:30:45.127Z [debug ] event_sequenced partition=7
2026-01-12T10:30:45.128Z [info ] request_completed status_code=202 duration_ms=5.23 request_id=abc-123 stream=users
```

### Production (JSON Logs)

For log aggregation systems (e.g., GCP Logging, Datadog):

```bash
export EVENTKIT_LOG_LEVEL="INFO"
export EVENTKIT_JSON_LOGS="true"

uv run uvicorn eventkit.api.app:app --port 8000
```

Example output (single-line JSON per log):
```json
{"event": "request_received", "method": "POST", "path": "/collect/users", "event_type": "identify", "request_id": "abc-123", "stream": "users", "level": "info", "timestamp": "2026-01-12T10:30:45.123Z", "logger": "eventkit.api.router"}
{"event": "request_completed", "status_code": 202, "duration_ms": 5.23, "request_id": "abc-123", "stream": "users", "level": "info", "timestamp": "2026-01-12T10:30:45.128Z", "logger": "eventkit.api.router"}
```

### Log Levels

| Level | When to Use | Example Operations |
|-------|-------------|-------------------|
| **DEBUG** | Development debugging | Event adaptation details, sequencer routing |
| **INFO** | Normal operations | API requests, buffer flushes, store writes |
| **WARNING** | Recoverable errors | Validation failures, retries |
| **ERROR** | Unrecoverable errors | Store failures, ring buffer failures |

### What's Logged

**Always Logged:**
- API requests: method, path, status_code, duration_ms
- Events: stream, event_type (NOT full payload for PII safety)
- Buffer flushes: partition, event_count, duration_ms
- Store writes: event_count, duration_ms

**Never Logged:**
- Full event payloads (PII risk)
- User identifiers in plaintext
- Auth tokens or secrets

### Context Propagation

All logs within a request include the same `request_id` and `stream` for tracing:

```bash
# Example: Trace a single event through the entire pipeline
cat logs.json | jq 'select(.request_id == "abc-123")'
```

---

## Stopping Services

```bash
Expand Down
22 changes: 19 additions & 3 deletions src/eventkit/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager

import structlog
from fastapi import FastAPI

from eventkit.api.dependencies import get_queue
from eventkit.api.router import router
from eventkit.config import Settings
from eventkit.logging import configure_logging

logger = logging.getLogger(__name__)

Expand All @@ -18,6 +21,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
Manage application lifecycle.

Startup:
- Configure logging (structlog with JSON/console formatters)
- Start EventQueue (which starts processor, buffer flusher, and ring buffer publisher)
- Ring buffer publisher begins moving events from ring buffer to internal queue
- Queue workers begin processing events
Expand All @@ -40,24 +44,36 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
Yields:
Control to the application during its lifetime
"""
# Configure logging
settings = Settings() # type: ignore[call-arg] # Settings requires GCP_PROJECT_ID from env
configure_logging(
json_logs=settings.EVENTKIT_JSON_LOGS,
log_level=settings.EVENTKIT_LOG_LEVEL,
)

# Get structured logger after configuration
app_logger = structlog.get_logger(__name__)

# Startup - queue manages ring buffer, publisher, workers, and processor
queue = get_queue()
await queue.start()

logger.info("Application started - ring buffer + queue active")
app_logger.info("application_started", mode=settings.EVENTKIT_QUEUE_MODE.value)

yield

# Shutdown - gracefully drain ring buffer and queue
logger.info("Application shutting down - draining ring buffer and queue")
app_logger.info("application_shutting_down")
await queue.stop()
logger.info("Application stopped")
app_logger.info("application_stopped")


def create_app() -> FastAPI:
"""
Create and configure FastAPI application.

Logging is configured in the lifespan function (on startup).

Returns:
Configured FastAPI application instance

Expand Down
43 changes: 38 additions & 5 deletions src/eventkit/api/router.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""API router for event collection endpoints."""

import logging
import time
import uuid

import structlog
from fastapi import APIRouter, Depends, Request
from fastapi.responses import JSONResponse

Expand All @@ -10,7 +12,7 @@
from eventkit.schema.raw import RawEvent
from eventkit.stores.event_store import EventStore

logger = logging.getLogger(__name__)
logger = structlog.get_logger(__name__)
router = APIRouter()


Expand Down Expand Up @@ -55,20 +57,48 @@ async def collect(
POST /collect/users
{"type": "identify", "userId": "user_123"}
"""
# Bind context for this request (propagates to all logs in this call chain)
request_id = str(uuid.uuid4())
structlog.contextvars.bind_contextvars(
request_id=request_id,
stream=stream,
)

start_time = time.perf_counter()
payload = await request.json()

logger.info(
"request_received",
method="POST",
path=f"/collect/{stream}",
event_type=payload.get("type"),
)

raw_event = RawEvent(payload=payload, stream=stream)

try:
# Enqueue event (durable write via internal ring buffer)
await queue.enqueue(raw_event)

duration_ms = (time.perf_counter() - start_time) * 1000
logger.info(
"request_completed",
status_code=202,
duration_ms=round(duration_ms, 2),
)

return JSONResponse({"status": "accepted"}, status_code=202)

except Exception as e:
# Ring buffer write failed - catastrophic failure (disk full, corruption, etc.)
# Return 503 to tell client to retry later
logger.critical(
"Ring buffer write failed - service unavailable",
extra={"error": str(e), "stream": stream, "event_type": payload.get("type")},
duration_ms = (time.perf_counter() - start_time) * 1000
logger.error(
"request_failed",
error=str(e),
error_type=type(e).__name__,
status_code=503,
duration_ms=round(duration_ms, 2),
exc_info=True,
)
return JSONResponse(
Expand All @@ -78,6 +108,9 @@ async def collect(
},
status_code=503,
)
finally:
# Clear context after request
structlog.contextvars.clear_contextvars()


@router.post("/v1/identify")
Expand Down
3 changes: 2 additions & 1 deletion src/eventkit/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,5 @@ class Settings(BaseSettings):
EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL: float = 3600.0 # Cleanup interval (seconds, 1 hour)

# Logging
LOG_LEVEL: str = "INFO"
EVENTKIT_LOG_LEVEL: str = "INFO" # Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
EVENTKIT_JSON_LOGS: bool = False # Use JSON formatter (True) or colored console (False)
5 changes: 5 additions & 0 deletions src/eventkit/logging/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Structured logging configuration for eventkit."""

from eventkit.logging.config import configure_logging

__all__ = ["configure_logging"]
Loading
Loading