Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,64 @@ POST /v1/page # routes to "pages" stream
}
```

## Validation & Adaptation

`eventkit` uses a two-tier architecture for transforming raw events into typed, validated events:

### Validators

Composable validators perform lightweight checks on raw event payloads:

```python
from eventkit.adapters.validators import (
ValidationPipeline,
RequiredFieldsValidator,
TypeCheckValidator,
TimestampValidator,
)

# Build custom validation pipeline
pipeline = ValidationPipeline([
RequiredFieldsValidator(["type", "userId"]),
TypeCheckValidator({"type": str, "userId": str}),
TimestampValidator(),
])

is_valid, error = pipeline.validate({"type": "identify", "userId": "123"})
```

**Built-in Validators:**
- `RequiredFieldsValidator` - Ensure required fields are present
- `TypeCheckValidator` - Validate field types (supports union types)
- `TimestampValidator` - Parse ISO 8601 and Unix timestamps
- `ValidationPipeline` - Compose validators with fail-fast behavior

### Adapters

Schema adapters transform validated `RawEvent` → typed `TypedEvent`:

```python
from eventkit.adapters import SegmentSchemaAdapter
from eventkit.schema.raw import RawEvent

adapter = SegmentSchemaAdapter()
raw = RawEvent(payload={"type": "identify", "userId": "123"})
result = adapter.adapt(raw)

if result.ok:
event = result.event # IdentifyEvent | TrackEvent | PageEvent
else:
error = result.error # Route to dead letter queue
```

**Built-in Adapters:**
- `SegmentSchemaAdapter` - Segment-compatible event spec (identify/track/page)

**Future Adapters:**
- `CustomSchemaAdapter` - Dynamic per-account schemas (in `eventkit-schema` package)
- `SnowplowAdapter` - Snowplow event format
- `AmplitudeAdapter` - Amplitude HTTP API format

## Configuration

```python
Expand Down Expand Up @@ -265,10 +323,11 @@ uv run ruff format src/
## Roadmap

### Core (v0.x)
- [x] Composable validators (required fields, types, timestamps)
- [x] Segment-compatible adapter with ValidationPipeline
- [ ] Collection API with stream routing
- [ ] Segment-compatible adapter
- [ ] Hash-based sequencer for consistent ordering
- [ ] Firestore storage backend
- [ ] Firestore storage backend (in progress)
- [ ] Error handling and dead letter queue
- [ ] Structured logging
- [ ] Performance benchmarks (10k+ events/sec)
Expand Down
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
"google-cloud-firestore>=2.13.0",
"structlog>=23.2.0",
"tenacity>=8.2.0",
"python-dateutil>=2.9.0.post0",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -80,3 +81,8 @@ disallow_untyped_defs = true
[[tool.mypy.overrides]]
module = "google.cloud.*"
ignore_missing_imports = true

[dependency-groups]
dev = [
"types-python-dateutil>=2.9.0.20251115",
]
44 changes: 29 additions & 15 deletions specs/core-pipeline/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,13 @@ class FirestoreErrorStore:
Create composable validators for lightweight event validation, including a ValidationPipeline for composition.

#### Acceptance Criteria
- [ ] Validator Protocol defined
- [ ] RequiredFieldsValidator (check field presence)
- [ ] TypeCheckValidator (validate basic types)
- [ ] TimestampValidator (parse ISO 8601, Unix timestamps)
- [ ] ValidationPipeline (composable pipeline)
- [ ] Validators are composable
- [ ] Unit tests for each validator
- [x] Validator Protocol defined
- [x] RequiredFieldsValidator (check field presence)
- [x] TypeCheckValidator (validate basic types)
- [x] TimestampValidator (parse ISO 8601, Unix timestamps)
- [x] ValidationPipeline (composable pipeline)
- [x] Validators are composable
- [x] Unit tests for each validator

#### Checklist
```python
Expand Down Expand Up @@ -331,15 +331,15 @@ class ValidationPipeline:
- `src/eventkit/adapters/validators/type_check.py` (new)
- `src/eventkit/adapters/validators/timestamp.py` (new)
- `src/eventkit/adapters/validators/pipeline.py` (new)
- `tests/unit/adapters/validators/test_base.py` (new)
- `tests/unit/adapters/validators/test_validator_protocol.py` (new)
- `tests/unit/adapters/validators/test_required_fields.py` (new)
- `tests/unit/adapters/validators/test_type_check.py` (new)
- `tests/unit/adapters/validators/test_timestamp.py` (new)
- `tests/unit/adapters/validators/test_pipeline.py` (new)

---

### Task 6: Implement Segment Adapter
### Task 6: Implement Segment Schema Adapter
**Estimated effort**: 2-3 hours
**Dependencies**: Task 5
**Phase**: Validation & Adaptation
Expand All @@ -348,12 +348,12 @@ class ValidationPipeline:
Create Segment-compatible schema adapter with ValidationPipeline and type routing.

#### Acceptance Criteria
- [ ] SegmentSchemaAdapter implements EventAdapter Protocol
- [ ] Uses ValidationPipeline for composable validation
- [ ] Routes by type: identify, track, page
- [ ] Ref extraction (userId, anonymousId)
- [ ] Invalid events return AdapterResult.err (not exception)
- [ ] Unit tests with 100% coverage
- [x] SegmentSchemaAdapter implements EventAdapter Protocol
- [x] Uses ValidationPipeline for composable validation
- [x] Routes by type: identify, track, page
- [x] Ref extraction (userId, anonymousId)
- [x] Invalid events return AdapterResult.failure (not exception)
- [x] Unit tests with 88% coverage (exception handlers are defensive)

#### Checklist
```python
Expand Down Expand Up @@ -400,7 +400,21 @@ class SegmentSchemaAdapter:

#### Files Changed
- `src/eventkit/adapters/segment.py` (new)
- `src/eventkit/adapters/__init__.py` (updated - export SegmentSchemaAdapter)
- `src/eventkit/schema/events.py` (updated - add stream field to TypedEvent)
- `tests/unit/adapters/test_segment.py` (new)
- `README.md` (updated - add Validation & Adaptation section)

#### Notes
**Naming**: Uses `SegmentSchemaAdapter` (not `SegmentAdapter`) to distinguish from format adapters. This is a **schema adapter** that applies the Segment spec rules.

**ValidationPipeline**: Composable validator chain that can be reused. The factory method `ValidationPipeline.for_segment_spec()` creates a fixed pipeline for Segment compatibility. Future `CustomSchemaAdapter` will use `ValidationPipeline.from_schema(account_schema)` for dynamic validation.

**Two-Tier Architecture**:
1. **Format Adapters** (future): Parse raw formats (JSON, Avro, etc.) to `RawEvent`
2. **Schema Adapters** (now): Apply schema rules to `RawEvent` → `TypedEvent`

This separation enables future `eventkit-schema` package integration without breaking changes.

#### Notes
- Named `SegmentSchemaAdapter` to clarify it applies a fixed schema (Segment spec)
Expand Down
3 changes: 2 additions & 1 deletion src/eventkit/adapters/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Adapter interfaces and base types."""

from eventkit.adapters.base import AdapterResult, EventAdapter
from eventkit.adapters.segment import SegmentSchemaAdapter

__all__ = ["AdapterResult", "EventAdapter"]
__all__ = ["AdapterResult", "EventAdapter", "SegmentSchemaAdapter"]
152 changes: 152 additions & 0 deletions src/eventkit/adapters/segment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""Segment-compatible event adapter."""

from eventkit.adapters.base import AdapterResult
from eventkit.adapters.validators import ValidationPipeline, parse_timestamp
from eventkit.schema.events import IdentifyEvent, PageEvent, TrackEvent
from eventkit.schema.raw import RawEvent


class SegmentSchemaAdapter:
"""
Segment-compatible schema adapter.

Transforms RawEvent → TypedEvent (IdentifyEvent, TrackEvent, PageEvent)
following Segment's event spec:
https://segment.com/docs/connections/spec/

This is a fixed schema adapter (rules are hardcoded).
Future: CustomSchemaAdapter will build rules dynamically from per-account schemas.

Features:
- Lightweight validation via ValidationPipeline
- Type routing (identify/track/page)
- Ref extraction (userId, anonymousId)
- Error handling (returns AdapterResult.err, not exceptions)

Example:
adapter = SegmentSchemaAdapter()
raw = RawEvent(payload={"type": "identify", "userId": "123"})
result = adapter.adapt(raw)
if result.success:
event = result.event # IdentifyEvent
"""

def __init__(self) -> None:
"""Initialize with Segment spec validation pipeline."""
# Use ValidationPipeline (composable, reusable)
self.pipeline = ValidationPipeline.for_segment_spec()

# Type routing for Segment event types
self.type_mapping = {
"identify": self._adapt_identify,
"track": self._adapt_track,
"page": self._adapt_page,
}

def adapt(self, raw: RawEvent) -> AdapterResult:
"""
Adapts a raw event into a canonical typed event.

Args:
raw: The raw event received from ingestion

Returns:
AdapterResult with ok=True and typed event, or
AdapterResult with ok=False and error message
"""
# 1. Validate using pipeline
is_valid, error = self.pipeline.validate(raw.payload)
if not is_valid:
# Validators always return error message when is_valid is False
assert error is not None, "Validator must return error message on failure"
return AdapterResult.failure(error)

# 2. Route by type
event_type = raw.get("type")
builder = self.type_mapping.get(event_type)
if not builder:
return AdapterResult.failure(f"Unknown event type: {event_type}")

# 3. Build typed event
return builder(raw)

def _adapt_identify(self, raw: RawEvent) -> AdapterResult:
"""Adapt identify event."""
# Parse timestamp with fallback
ts_raw = raw.get("timestamp")
timestamp = parse_timestamp(ts_raw) if ts_raw else None
if timestamp is None:
timestamp = raw.received_at

# Require at least one identity field
user_id = raw.get("userId")
anon_id = raw.get("anonymousId")
if not user_id and not anon_id:
return AdapterResult.failure("identify event requires userId or anonymousId")

try:
event = IdentifyEvent(
user_id=user_id,
anonymous_id=anon_id,
timestamp=timestamp,
traits=raw.get("traits", {}),
properties=raw.get("properties", {}),
stream=raw.stream,
# PostHog-style property updates (use alias names)
**{"$set": raw.get("$set"), "$set_once": raw.get("$set_once")},
)
return AdapterResult.success(event)
except Exception as e:
return AdapterResult.failure(f"Failed to create IdentifyEvent: {e}")

def _adapt_track(self, raw: RawEvent) -> AdapterResult:
"""Adapt track event."""
# Require event name
event_name = raw.get("event")
if not event_name:
return AdapterResult.failure("track event requires 'event' field")

# Parse timestamp with fallback
ts_raw = raw.get("timestamp")
timestamp = parse_timestamp(ts_raw) if ts_raw else None
if timestamp is None:
timestamp = raw.received_at

try:
event = TrackEvent(
event_name=event_name,
user_id=raw.get("userId"),
anonymous_id=raw.get("anonymousId"),
timestamp=timestamp,
properties=raw.get("properties", {}),
stream=raw.stream,
)
return AdapterResult.success(event)
except Exception as e:
return AdapterResult.failure(f"Failed to create TrackEvent: {e}")

def _adapt_page(self, raw: RawEvent) -> AdapterResult:
"""Adapt page event."""
# Require page name
name = raw.get("name")
if not name:
return AdapterResult.failure("page event requires 'name' field")

# Parse timestamp with fallback
ts_raw = raw.get("timestamp")
timestamp = parse_timestamp(ts_raw) if ts_raw else None
if timestamp is None:
timestamp = raw.received_at

try:
event = PageEvent(
name=name,
user_id=raw.get("userId"),
anonymous_id=raw.get("anonymousId"),
timestamp=timestamp,
properties=raw.get("properties", {}),
stream=raw.stream,
)
return AdapterResult.success(event)
except Exception as e:
return AdapterResult.failure(f"Failed to create PageEvent: {e}")
16 changes: 16 additions & 0 deletions src/eventkit/adapters/validators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Event payload validators."""

from eventkit.adapters.validators.base import Validator
from eventkit.adapters.validators.pipeline import ValidationPipeline
from eventkit.adapters.validators.required_fields import RequiredFieldsValidator
from eventkit.adapters.validators.timestamp import TimestampValidator, parse_timestamp
from eventkit.adapters.validators.type_check import TypeCheckValidator

__all__ = [
"Validator",
"ValidationPipeline",
"RequiredFieldsValidator",
"TypeCheckValidator",
"TimestampValidator",
"parse_timestamp",
]
29 changes: 29 additions & 0 deletions src/eventkit/adapters/validators/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Base protocol for event validators."""

from typing import Any, Protocol


class Validator(Protocol):
"""
Protocol for event payload validators.

Validators perform lightweight checks on raw event payloads
before adapting them to typed events.

Returns:
tuple[bool, str | None]: (is_valid, error_message)
- (True, None) if validation passes
- (False, "error message") if validation fails
"""

def validate(self, payload: dict[str, Any]) -> tuple[bool, str | None]:
"""
Validates an event payload.

Args:
payload: The raw event payload dictionary

Returns:
A tuple of (is_valid, error_message)
"""
...
Loading
Loading