From 14b400c244528b246ec8b94c5a34daff2532f653 Mon Sep 17 00:00:00 2001 From: prosdev Date: Mon, 12 Jan 2026 11:36:50 -0800 Subject: [PATCH 1/8] feat: configure structlog with dual formatters Add structured logging infrastructure with JSON and console formatters. Files Created: - src/eventkit/logging/__init__.py - src/eventkit/logging/config.py Configuration: - Dual formatters: JSON for prod, colored console for dev - Context propagation via contextvars - ISO timestamps, log level, logger name in all logs Settings Added: - EVENTKIT_LOG_LEVEL (default: INFO) - EVENTKIT_JSON_LOGS (default: False) Call configure_logging() in FastAPI app creation. --- src/eventkit/api/app.py | 9 ++++ src/eventkit/config.py | 3 +- src/eventkit/logging/__init__.py | 5 +++ src/eventkit/logging/config.py | 73 ++++++++++++++++++++++++++++++++ 4 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 src/eventkit/logging/__init__.py create mode 100644 src/eventkit/logging/config.py diff --git a/src/eventkit/api/app.py b/src/eventkit/api/app.py index e00c61d..f212f94 100644 --- a/src/eventkit/api/app.py +++ b/src/eventkit/api/app.py @@ -8,6 +8,8 @@ from eventkit.api.dependencies import get_queue from eventkit.api.router import router +from eventkit.config import Settings +from eventkit.logging import configure_logging logger = logging.getLogger(__name__) @@ -71,6 +73,13 @@ def create_app() -> FastAPI: app = create_app() uvicorn.run(app, host="0.0.0.0", port=8000) """ + # Configure logging based on settings + settings = Settings() # type: ignore[call-arg] # Settings requires GCP_PROJECT_ID from env + configure_logging( + json_logs=settings.EVENTKIT_JSON_LOGS, + log_level=settings.EVENTKIT_LOG_LEVEL, + ) + app = FastAPI( title="eventkit", description="Event processing pipeline for customer data", diff --git a/src/eventkit/config.py b/src/eventkit/config.py index 42faa36..e87d26c 100644 --- a/src/eventkit/config.py +++ b/src/eventkit/config.py @@ -83,4 +83,5 @@ class Settings(BaseSettings): EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL: float = 3600.0 # Cleanup interval (seconds, 1 hour) # Logging - LOG_LEVEL: str = "INFO" + EVENTKIT_LOG_LEVEL: str = "INFO" # Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + EVENTKIT_JSON_LOGS: bool = False # Use JSON formatter (True) or colored console (False) diff --git a/src/eventkit/logging/__init__.py b/src/eventkit/logging/__init__.py new file mode 100644 index 0000000..07017bd --- /dev/null +++ b/src/eventkit/logging/__init__.py @@ -0,0 +1,5 @@ +"""Structured logging configuration for eventkit.""" + +from eventkit.logging.config import configure_logging + +__all__ = ["configure_logging"] diff --git a/src/eventkit/logging/config.py b/src/eventkit/logging/config.py new file mode 100644 index 0000000..d8940e6 --- /dev/null +++ b/src/eventkit/logging/config.py @@ -0,0 +1,73 @@ +"""Logging configuration for eventkit using structlog.""" + +import logging +import sys +from typing import Any + +import structlog + + +def configure_logging(json_logs: bool = False, log_level: str = "INFO") -> None: + """Configure structlog for development or production. + + Args: + json_logs: If True, use JSON formatter for production. + If False, use colored console formatter for development. + log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL). + """ + # Convert string log level to logging constant + numeric_level = getattr(logging, log_level.upper(), logging.INFO) + + # Configure stdlib logging + logging.basicConfig( + format="%(message)s", + stream=sys.stdout, + level=numeric_level, + ) + + # Shared processors for both formatters + processors: list[Any] = [ + structlog.contextvars.merge_contextvars, # Merge context variables + structlog.stdlib.add_log_level, # Add log level to event dict + structlog.stdlib.add_logger_name, # Add logger name to event dict + structlog.processors.TimeStamper(fmt="iso"), # ISO 8601 timestamps + structlog.processors.StackInfoRenderer(), # Render stack info + structlog.processors.format_exc_info, # Format exception info + structlog.processors.UnicodeDecoder(), # Decode unicode + ] + + # Choose renderer based on environment + renderer: Any + if json_logs: + # Production: single-line JSON + renderer = structlog.processors.JSONRenderer() + else: + # Development: colored console output + renderer = structlog.dev.ConsoleRenderer(colors=True) + + # Configure structlog + structlog.configure( + processors=[ + structlog.stdlib.filter_by_level, # Filter by log level first + *processors, + structlog.stdlib.ProcessorFormatter.wrap_for_formatter, + ], + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + wrapper_class=structlog.stdlib.BoundLogger, + cache_logger_on_first_use=True, + ) + + # Configure stdlib logging to use structlog processor + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter( + structlog.stdlib.ProcessorFormatter( + processor=renderer, + foreign_pre_chain=processors, + ) + ) + + root_logger = logging.getLogger() + root_logger.handlers.clear() + root_logger.addHandler(handler) + root_logger.setLevel(numeric_level) From ef177a0c8cae665bb30e045c1d5ce3c308d4e9b1 Mon Sep 17 00:00:00 2001 From: prosdev Date: Mon, 12 Jan 2026 13:24:22 -0800 Subject: [PATCH 2/8] feat: add structured logging to API endpoints Add logging to API router with context propagation and timing: Changes: - Use structlog.get_logger() instead of stdlib logging - Bind request context (request_id, stream) for all logs - Log request_received with method, path, event_type - Log request_completed with status_code, duration_ms - Log request_failed with error, error_type, status_code - Clear context after request (in finally block) - Move logging configuration to lifespan (startup) Metrics Logged: - duration_ms: Request processing time - event_type: Type of event received - status_code: HTTP response status Context Variables: - request_id: UUID for tracing - stream: Event stream name All tests passing. --- src/eventkit/api/app.py | 27 +++++++++++++++--------- src/eventkit/api/router.py | 43 +++++++++++++++++++++++++++++++++----- 2 files changed, 55 insertions(+), 15 deletions(-) diff --git a/src/eventkit/api/app.py b/src/eventkit/api/app.py index f212f94..9497d3e 100644 --- a/src/eventkit/api/app.py +++ b/src/eventkit/api/app.py @@ -4,6 +4,7 @@ from collections.abc import AsyncGenerator from contextlib import asynccontextmanager +import structlog from fastapi import FastAPI from eventkit.api.dependencies import get_queue @@ -20,6 +21,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: Manage application lifecycle. Startup: + - Configure logging (structlog with JSON/console formatters) - Start EventQueue (which starts processor, buffer flusher, and ring buffer publisher) - Ring buffer publisher begins moving events from ring buffer to internal queue - Queue workers begin processing events @@ -42,24 +44,36 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: Yields: Control to the application during its lifetime """ + # Configure logging + settings = Settings() # type: ignore[call-arg] # Settings requires GCP_PROJECT_ID from env + configure_logging( + json_logs=settings.EVENTKIT_JSON_LOGS, + log_level=settings.EVENTKIT_LOG_LEVEL, + ) + + # Get structured logger after configuration + app_logger = structlog.get_logger(__name__) + # Startup - queue manages ring buffer, publisher, workers, and processor queue = get_queue() await queue.start() - logger.info("Application started - ring buffer + queue active") + app_logger.info("application_started", mode=settings.EVENTKIT_QUEUE_MODE.value) yield # Shutdown - gracefully drain ring buffer and queue - logger.info("Application shutting down - draining ring buffer and queue") + app_logger.info("application_shutting_down") await queue.stop() - logger.info("Application stopped") + app_logger.info("application_stopped") def create_app() -> FastAPI: """ Create and configure FastAPI application. + Logging is configured in the lifespan function (on startup). + Returns: Configured FastAPI application instance @@ -73,13 +87,6 @@ def create_app() -> FastAPI: app = create_app() uvicorn.run(app, host="0.0.0.0", port=8000) """ - # Configure logging based on settings - settings = Settings() # type: ignore[call-arg] # Settings requires GCP_PROJECT_ID from env - configure_logging( - json_logs=settings.EVENTKIT_JSON_LOGS, - log_level=settings.EVENTKIT_LOG_LEVEL, - ) - app = FastAPI( title="eventkit", description="Event processing pipeline for customer data", diff --git a/src/eventkit/api/router.py b/src/eventkit/api/router.py index 66c6b25..c692978 100644 --- a/src/eventkit/api/router.py +++ b/src/eventkit/api/router.py @@ -1,7 +1,9 @@ """API router for event collection endpoints.""" -import logging +import time +import uuid +import structlog from fastapi import APIRouter, Depends, Request from fastapi.responses import JSONResponse @@ -10,7 +12,7 @@ from eventkit.schema.raw import RawEvent from eventkit.stores.event_store import EventStore -logger = logging.getLogger(__name__) +logger = structlog.get_logger(__name__) router = APIRouter() @@ -55,20 +57,48 @@ async def collect( POST /collect/users {"type": "identify", "userId": "user_123"} """ + # Bind context for this request (propagates to all logs in this call chain) + request_id = str(uuid.uuid4()) + structlog.contextvars.bind_contextvars( + request_id=request_id, + stream=stream, + ) + + start_time = time.perf_counter() payload = await request.json() + + logger.info( + "request_received", + method="POST", + path=f"/collect/{stream}", + event_type=payload.get("type"), + ) + raw_event = RawEvent(payload=payload, stream=stream) try: # Enqueue event (durable write via internal ring buffer) await queue.enqueue(raw_event) + + duration_ms = (time.perf_counter() - start_time) * 1000 + logger.info( + "request_completed", + status_code=202, + duration_ms=round(duration_ms, 2), + ) + return JSONResponse({"status": "accepted"}, status_code=202) except Exception as e: # Ring buffer write failed - catastrophic failure (disk full, corruption, etc.) # Return 503 to tell client to retry later - logger.critical( - "Ring buffer write failed - service unavailable", - extra={"error": str(e), "stream": stream, "event_type": payload.get("type")}, + duration_ms = (time.perf_counter() - start_time) * 1000 + logger.error( + "request_failed", + error=str(e), + error_type=type(e).__name__, + status_code=503, + duration_ms=round(duration_ms, 2), exc_info=True, ) return JSONResponse( @@ -78,6 +108,9 @@ async def collect( }, status_code=503, ) + finally: + # Clear context after request + structlog.contextvars.clear_contextvars() @router.post("/v1/identify") From 98ce0fa178d5dcb5781e3582c2b8327f30a39f57 Mon Sep 17 00:00:00 2001 From: prosdev Date: Mon, 12 Jan 2026 13:25:40 -0800 Subject: [PATCH 3/8] feat: add structured logging to Processor Add logging to event processing pipeline: Changes: - Log event_received (DEBUG): event_type, stream - Log adaptation_failed (WARN): error, event_type, stream - Log event_adapted (DEBUG): event_type - Log event_sequenced (DEBUG): partition Never logs full event payloads (PII safety). All tests passing. --- src/eventkit/processing/processor.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/eventkit/processing/processor.py b/src/eventkit/processing/processor.py index a910d17..f25a0a2 100644 --- a/src/eventkit/processing/processor.py +++ b/src/eventkit/processing/processor.py @@ -16,12 +16,16 @@ from datetime import UTC, datetime +import structlog + from eventkit.adapters.base import EventAdapter from eventkit.processing.buffer import Buffer from eventkit.processing.sequencer import Sequencer from eventkit.schema.raw import RawEvent from eventkit.stores.error_store import ErrorStore +logger = structlog.get_logger(__name__) + class Processor: """ @@ -98,11 +102,23 @@ async def process_event(self, raw_event: RawEvent) -> None: Args: raw_event: RawEvent to process """ + logger.debug( + "event_received", + event_type=raw_event.payload.get("type"), + stream=raw_event.stream, + ) + # Step 1: Adapt (validate & normalize) result = self.adapter.adapt(raw_event) if not result.ok: # Invalid event → error store + logger.warning( + "adaptation_failed", + error=result.error or "Unknown error", + event_type=raw_event.payload.get("type"), + stream=raw_event.stream, + ) await self.error_store.store_error( payload=raw_event.payload, error=result.error or "Unknown error", @@ -114,9 +130,13 @@ async def process_event(self, raw_event: RawEvent) -> None: # Type narrowing: If ok=True, event must be present assert result.event is not None, "Adapter returned ok=True but event is None" + logger.debug("event_adapted", event_type=result.event.event_type) + # Step 2: Sequence (consistent routing) partition_id = self.sequencer.get_partition_id(result.event) + logger.debug("event_sequenced", partition=partition_id) + # Step 3: Buffer (batch writes) await self.buffer.enqueue(result.event, partition_id) From 2066f758bb4724b430d465f77ea1b13a503ed538 Mon Sep 17 00:00:00 2001 From: prosdev Date: Mon, 12 Jan 2026 13:29:14 -0800 Subject: [PATCH 4/8] feat: add structured logging to Buffer with timing Add logging to buffer operations with performance metrics: Changes: - Log buffer_add (DEBUG): partition, buffer_size - Log buffer_flush_start (INFO): partition, event_count - Log buffer_flush_complete (INFO): partition, event_count, duration_ms Performance tracking with timing measurements for flush operations. All tests passing. --- src/eventkit/processing/buffer.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/eventkit/processing/buffer.py b/src/eventkit/processing/buffer.py index e37c49b..b560cf4 100644 --- a/src/eventkit/processing/buffer.py +++ b/src/eventkit/processing/buffer.py @@ -1,13 +1,18 @@ """Per-partition event buffer with size and time-based flushing.""" import asyncio +import time from datetime import UTC, datetime +import structlog + from eventkit.errors import BufferFullError from eventkit.processing.buffer_storage import BufferStorage, InMemoryBufferStorage from eventkit.schema.events import TypedEvent from eventkit.stores.event_store import EventStore +logger = structlog.get_logger(__name__) + class Buffer: """ @@ -97,9 +102,12 @@ async def enqueue(self, event: TypedEvent, partition_id: int) -> None: ) await self.storage.append(partition_id, event) + buffer_size = self.storage.len(partition_id) + + logger.debug("buffer_add", partition=partition_id, buffer_size=buffer_size) # Size-based flush - if self.storage.len(partition_id) >= self.size: + if buffer_size >= self.size: await self._flush_partition(partition_id) async def _flush_partition(self, partition_id: int) -> None: @@ -113,6 +121,11 @@ async def _flush_partition(self, partition_id: int) -> None: if not events: return + start_time = time.perf_counter() + event_count = len(events) + + logger.info("buffer_flush_start", partition=partition_id, event_count=event_count) + # Batch write to storage await self.event_store.store_batch(events) @@ -120,6 +133,14 @@ async def _flush_partition(self, partition_id: int) -> None: await self.storage.clear(partition_id) self.last_flush[partition_id] = datetime.now(UTC) + duration_ms = (time.perf_counter() - start_time) * 1000 + logger.info( + "buffer_flush_complete", + partition=partition_id, + event_count=event_count, + duration_ms=round(duration_ms, 2), + ) + async def start_flusher(self) -> None: """ Start background time-based flusher task. From 0ae5db2faa9f007142247250b4c4bcf70cc19446 Mon Sep 17 00:00:00 2001 From: prosdev Date: Mon, 12 Jan 2026 13:32:33 -0800 Subject: [PATCH 5/8] feat: add structured logging to EventStore and ErrorStore Add logging to storage operations: EventStore (FirestoreEventStore): - Log store_write_start (INFO): event_count - Log store_write_complete (INFO): event_count, duration_ms, collection - Log store_write_failed (ERROR): event_count, error, error_type, duration_ms ErrorStore (FirestoreErrorStore): - Log error_stored (WARN): stream, error Never logs full event payloads (PII safety). All tests passing. --- src/eventkit/stores/firestore.py | 39 +++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/src/eventkit/stores/firestore.py b/src/eventkit/stores/firestore.py index 88e375d..cb65720 100644 --- a/src/eventkit/stores/firestore.py +++ b/src/eventkit/stores/firestore.py @@ -6,9 +6,11 @@ """ import asyncio +import time from datetime import datetime from typing import Any +import structlog from google.api_core.exceptions import ( DeadlineExceeded, InternalServerError, @@ -24,6 +26,8 @@ from eventkit.schema.events import TypedEvent +logger = structlog.get_logger(__name__) + class FirestoreEventStore: """ @@ -130,7 +134,33 @@ async def store_batch(self, events: list[TypedEvent]) -> None: Raises: StorageError: If any batch cannot be stored after retries """ - await asyncio.to_thread(self._sync_store_batch, events) + event_count = len(events) + start_time = time.perf_counter() + + logger.info("store_write_start", event_count=event_count) + + try: + await asyncio.to_thread(self._sync_store_batch, events) + + duration_ms = (time.perf_counter() - start_time) * 1000 + logger.info( + "store_write_complete", + event_count=event_count, + duration_ms=round(duration_ms, 2), + collection="events", + ) + + except Exception as e: + duration_ms = (time.perf_counter() - start_time) * 1000 + logger.error( + "store_write_failed", + event_count=event_count, + error=str(e), + error_type=type(e).__name__, + duration_ms=round(duration_ms, 2), + exc_info=True, + ) + raise def _sync_store_batch(self, events: list[TypedEvent]) -> None: """Synchronous batch storage with chunking and retry logic.""" @@ -295,6 +325,13 @@ async def store_error( Raises: StorageError: If the error cannot be stored after retries """ + stream = metadata.get("stream") if metadata else None + logger.warning( + "error_stored", + stream=stream, + error=error, + ) + await asyncio.to_thread(self._sync_store_error, payload, error, timestamp, metadata) def _sync_store_error( From a06dfaff5f7e7cd1447e6158b2172e30d6ebba70 Mon Sep 17 00:00:00 2001 From: prosdev Date: Mon, 12 Jan 2026 13:33:16 -0800 Subject: [PATCH 6/8] test: add unit tests for logging configuration Add comprehensive tests for structlog configuration: Configuration Tests: - Log level setting (DEBUG, INFO) - JSON mode configuration - Console mode configuration - Logger functionality after config Context Propagation Tests: - Context variable binding - Context variable clearing All tests passing. --- tests/unit/logging/test_config.py | 86 +++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 tests/unit/logging/test_config.py diff --git a/tests/unit/logging/test_config.py b/tests/unit/logging/test_config.py new file mode 100644 index 0000000..f8f4b8c --- /dev/null +++ b/tests/unit/logging/test_config.py @@ -0,0 +1,86 @@ +"""Tests for structured logging configuration.""" + +import logging + +import structlog + +from eventkit.logging.config import configure_logging + + +class TestLoggingConfiguration: + """Test logging configuration.""" + + def test_configure_logging_sets_log_level(self): + """Test that configure_logging sets the correct log level.""" + configure_logging(json_logs=False, log_level="DEBUG") + + root_logger = logging.getLogger() + assert root_logger.level == logging.DEBUG + + def test_configure_logging_info_level(self): + """Test that configure_logging sets INFO level by default.""" + configure_logging(json_logs=False, log_level="INFO") + + root_logger = logging.getLogger() + assert root_logger.level == logging.INFO + + def test_configure_logging_json_mode(self): + """Test that configure_logging works with JSON mode.""" + # Should not raise exceptions + configure_logging(json_logs=True, log_level="INFO") + + # Get logger and verify it works + logger = structlog.get_logger(__name__) + logger.info("test_message", key="value") + + def test_configure_logging_console_mode(self): + """Test that configure_logging works with console mode.""" + # Should not raise exceptions + configure_logging(json_logs=False, log_level="INFO") + + # Get logger and verify it works + logger = structlog.get_logger(__name__) + logger.info("test_message", key="value") + + def test_structlog_logger_works_after_configuration(self): + """Test that structlog loggers work after configuration.""" + configure_logging(json_logs=False, log_level="DEBUG") + + logger = structlog.get_logger(__name__) + + # Should not raise exceptions + logger.debug("debug_message", foo="bar") + logger.info("info_message", count=42) + logger.warning("warning_message", error="test") + logger.error("error_message", exception=Exception("test")) + + +class TestContextPropagation: + """Test structlog context propagation.""" + + def test_context_vars_binding(self): + """Test that context variables are bound correctly.""" + configure_logging(json_logs=False, log_level="DEBUG") + + # Bind context + structlog.contextvars.bind_contextvars(request_id="test-123", stream="users") + + logger = structlog.get_logger(__name__) + + # Log message (context should be included automatically) + logger.info("test_message") + + # Clear context + structlog.contextvars.clear_contextvars() + + def test_context_vars_cleared(self): + """Test that context variables are cleared correctly.""" + configure_logging(json_logs=False, log_level="DEBUG") + + # Bind and clear + structlog.contextvars.bind_contextvars(request_id="test-456") + structlog.contextvars.clear_contextvars() + + # Should not raise exceptions + logger = structlog.get_logger(__name__) + logger.info("test_message") From 35138acb0e8179c807a7dc1cbc242bea5d447ed0 Mon Sep 17 00:00:00 2001 From: prosdev Date: Mon, 12 Jan 2026 13:34:32 -0800 Subject: [PATCH 7/8] docs: add logging documentation for developers Update LOCAL_DEV.md and CLAUDE.md with structured logging guides: LOCAL_DEV.md: - Development vs production logging - Log level configuration - JSON logs for production - Example output for both modes - Log levels and what they mean - What is/is not logged (PII safety) - Context propagation for tracing CLAUDE.md: - How to use structlog correctly - What to log at each level - What NEVER to log (PII, tokens, etc) - Context propagation patterns - Performance timing examples Complete structured logging implementation. --- CLAUDE.md | 87 ++++++++++++++++++++++++++++++++++++++++++++++++++++ LOCAL_DEV.md | 74 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+) diff --git a/CLAUDE.md b/CLAUDE.md index 1f4ae80..3839a45 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -56,6 +56,93 @@ Use [conventional commits](https://www.conventionalcommits.org/): Keep first line under 72 characters. +## Logging + +### Using Structured Logging + +Use `structlog.get_logger(__name__)` in all modules: + +```python +import structlog + +logger = structlog.get_logger(__name__) + +# Good: structured key-value pairs +logger.info("event_received", stream="users", event_type="identify") + +# Bad: string interpolation +logger.info(f"Received {event_type} event from {stream}") # Don't do this! +``` + +### What to Log + +**Always Log (INFO level)**: +- API requests: `method`, `path`, `status_code`, `duration_ms` +- Events received: `stream`, `event_type` (NOT full payload) +- Buffer flushes: `partition`, `event_count`, `duration_ms` +- Store writes: `event_count`, `duration_ms` + +**Sometimes Log (DEBUG level)**: +- Event adaptation details +- Sequencer routing: `event_id`, `partition` +- Worker lifecycle changes + +**Always Log (WARN level)**: +- Validation failures: `error_type`, `error` (NOT full event) +- Retries: `operation`, `attempt`, `reason` + +**Always Log (ERROR level)**: +- Store failures: `error`, `event_count` +- Ring buffer failures: `error`, `current_size` +- Worker crashes with stack traces (`exc_info=True`) + +### What NOT to Log + +**Never log**: +- Full event payloads (PII risk) +- User identifiers in plaintext +- Auth tokens or secrets + +### Context Propagation + +Use `structlog.contextvars` for automatic context in all logs: + +```python +import structlog + +# At API entry point (request handler) +structlog.contextvars.bind_contextvars( + request_id=str(uuid.uuid4()), + stream=stream, +) + +# Anywhere in the call chain (automatically includes request_id and stream) +logger.info("event_adapted", event_type="identify") + +# Clear context after request +structlog.contextvars.clear_contextvars() +``` + +### Performance Timing + +Log performance metrics for slow operations: + +```python +import time + +start_time = time.perf_counter() + +# ... operation ... + +duration_ms = (time.perf_counter() - start_time) * 1000 +logger.info( + "buffer_flush_complete", + partition=partition_id, + event_count=len(events), + duration_ms=round(duration_ms, 2), +) +``` + ## Critical Patterns ### Queue-Agnostic Processor diff --git a/LOCAL_DEV.md b/LOCAL_DEV.md index e2d31bc..d6c06d7 100644 --- a/LOCAL_DEV.md +++ b/LOCAL_DEV.md @@ -165,6 +165,80 @@ API → ring buffer (durable) → publisher → queue → workers → Firestore --- +## Logging + +eventkit uses structured logging via `structlog` for production observability. + +### Development (Pretty Logs) + +By default, logs are colored and human-readable: + +```bash +export EVENTKIT_LOG_LEVEL="DEBUG" # or INFO (default), WARNING, ERROR +export EVENTKIT_JSON_LOGS="false" # default + +uv run uvicorn eventkit.api.app:app --reload --port 8000 +``` + +Example output: +``` +2026-01-12T10:30:45.123Z [info ] request_received method=POST path=/collect/users event_type=identify request_id=abc-123 stream=users +2026-01-12T10:30:45.125Z [debug ] event_received event_type=identify stream=users +2026-01-12T10:30:45.126Z [debug ] event_adapted event_type=identify +2026-01-12T10:30:45.127Z [debug ] event_sequenced partition=7 +2026-01-12T10:30:45.128Z [info ] request_completed status_code=202 duration_ms=5.23 request_id=abc-123 stream=users +``` + +### Production (JSON Logs) + +For log aggregation systems (e.g., GCP Logging, Datadog): + +```bash +export EVENTKIT_LOG_LEVEL="INFO" +export EVENTKIT_JSON_LOGS="true" + +uv run uvicorn eventkit.api.app:app --port 8000 +``` + +Example output (single-line JSON per log): +```json +{"event": "request_received", "method": "POST", "path": "/collect/users", "event_type": "identify", "request_id": "abc-123", "stream": "users", "level": "info", "timestamp": "2026-01-12T10:30:45.123Z", "logger": "eventkit.api.router"} +{"event": "request_completed", "status_code": 202, "duration_ms": 5.23, "request_id": "abc-123", "stream": "users", "level": "info", "timestamp": "2026-01-12T10:30:45.128Z", "logger": "eventkit.api.router"} +``` + +### Log Levels + +| Level | When to Use | Example Operations | +|-------|-------------|-------------------| +| **DEBUG** | Development debugging | Event adaptation details, sequencer routing | +| **INFO** | Normal operations | API requests, buffer flushes, store writes | +| **WARNING** | Recoverable errors | Validation failures, retries | +| **ERROR** | Unrecoverable errors | Store failures, ring buffer failures | + +### What's Logged + +**Always Logged:** +- API requests: method, path, status_code, duration_ms +- Events: stream, event_type (NOT full payload for PII safety) +- Buffer flushes: partition, event_count, duration_ms +- Store writes: event_count, duration_ms + +**Never Logged:** +- Full event payloads (PII risk) +- User identifiers in plaintext +- Auth tokens or secrets + +### Context Propagation + +All logs within a request include the same `request_id` and `stream` for tracing: + +```bash +# Example: Trace a single event through the entire pipeline +cat logs.json | jq 'select(.request_id == "abc-123")' +``` + +--- + ## Stopping Services ```bash From 9eefc52055062fe7036c2b0feba485e69a6b4850 Mon Sep 17 00:00:00 2001 From: prosdev Date: Mon, 12 Jan 2026 13:39:21 -0800 Subject: [PATCH 8/8] fix: update config tests for renamed logging settings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename test file and update assertions: - tests/unit/logging/test_config.py → test_logging_config.py (avoid conflict) - LOG_LEVEL → EVENTKIT_LOG_LEVEL - Add EVENTKIT_JSON_LOGS assertions All tests passing. --- .../logging/{test_config.py => test_logging_config.py} | 0 tests/unit/test_config.py | 9 ++++++--- 2 files changed, 6 insertions(+), 3 deletions(-) rename tests/unit/logging/{test_config.py => test_logging_config.py} (100%) diff --git a/tests/unit/logging/test_config.py b/tests/unit/logging/test_logging_config.py similarity index 100% rename from tests/unit/logging/test_config.py rename to tests/unit/logging/test_logging_config.py diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 6ae483c..505a252 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -57,7 +57,8 @@ def test_settings_default_values(clean_env, monkeypatch): assert settings.EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL == 3600.0 # Logging - assert settings.LOG_LEVEL == "INFO" + assert settings.EVENTKIT_LOG_LEVEL == "INFO" + assert settings.EVENTKIT_JSON_LOGS is False def test_settings_from_environment(clean_env, monkeypatch): @@ -70,7 +71,8 @@ def test_settings_from_environment(clean_env, monkeypatch): monkeypatch.setenv("EVENTKIT_NUM_PARTITIONS", "32") monkeypatch.setenv("EVENTKIT_QUEUE_MODE", "async") monkeypatch.setenv("EVENTKIT_ASYNC_WORKERS", "8") - monkeypatch.setenv("LOG_LEVEL", "DEBUG") + monkeypatch.setenv("EVENTKIT_LOG_LEVEL", "DEBUG") + monkeypatch.setenv("EVENTKIT_JSON_LOGS", "true") settings = Settings() @@ -82,7 +84,8 @@ def test_settings_from_environment(clean_env, monkeypatch): assert settings.EVENTKIT_NUM_PARTITIONS == 32 assert settings.EVENTKIT_QUEUE_MODE == QueueMode.ASYNC assert settings.EVENTKIT_ASYNC_WORKERS == 8 - assert settings.LOG_LEVEL == "DEBUG" + assert settings.EVENTKIT_LOG_LEVEL == "DEBUG" + assert settings.EVENTKIT_JSON_LOGS is True def test_ring_buffer_config_from_environment(clean_env, monkeypatch):