Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions specs/core-pipeline/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,12 +432,12 @@ This separation enables future `eventkit-schema` package integration without bre
Implement hash-based routing to ensure consistent partition assignment per identity.

#### Acceptance Criteria
- [ ] Sequencer routes events by identity hash
- [ ] Uses FNV-1a hash algorithm
- [ ] Same userId always routes to same partition
- [ ] Configurable num_partitions (default: 16)
- [ ] Handles missing identifiers (fallback)
- [ ] Unit tests verify consistent routing
- [x] Sequencer routes events by identity hash
- [x] Uses Python's built-in hash() (future: upgrade to Murmur3 if needed)
- [x] Same userId always routes to same partition
- [x] Configurable num_partitions (default: 16)
- [x] Handles missing identifiers (fallback)
- [x] Unit tests verify consistent routing

#### Checklist
```python
Expand Down
5 changes: 5 additions & 0 deletions src/eventkit/processing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Event processing primitives."""

from eventkit.processing.sequencer import HashSequencer, Sequencer

__all__ = ["Sequencer", "HashSequencer"]
111 changes: 111 additions & 0 deletions src/eventkit/processing/sequencer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Hash-based event sequencer for consistent partition routing."""

from typing import Protocol

from eventkit.schema.events import TypedEvent


class Sequencer(Protocol):
"""
Protocol for event routing to partitions.

Implementations can use different routing strategies:
- Hash-based (HashSequencer)
- Round-robin
- Random
- Kafka-delegated

Example:
sequencer = HashSequencer(num_partitions=16)
partition = sequencer.get_partition_id(event)
"""

def get_partition_id(self, event: TypedEvent) -> int:
"""
Route event to partition.

Args:
event: The typed event to route

Returns:
Partition ID (0 to num_partitions-1)
"""
...


class HashSequencer:
"""
Hash-based event sequencer.

Routes events to partitions using Python's built-in hash() function.
Guarantees same identity → same partition for consistent ordering.

Routing priority:
1. userId (if present)
2. anonymousId (fallback)
3. eventId (final fallback)

Example:
>>> sequencer = HashSequencer(num_partitions=16)
>>> event = TypedEvent(user_id="alice")
>>> partition = sequencer.get_partition_id(event)
>>> # Same userId always routes to same partition
>>> assert sequencer.get_partition_id(event) == partition

Production notes:
- Hash function: Python's hash() (fast, good distribution)
- Future: Can add Murmur3 if distribution issues arise
- Production CDPs often use Murmur3 for better distribution
"""

def __init__(self, num_partitions: int = 16) -> None:
"""
Initialize sequencer.

Args:
num_partitions: Number of partitions (default: 16)

Raises:
ValueError: If num_partitions <= 0
"""
if num_partitions <= 0:
raise ValueError("num_partitions must be > 0")
self.num_partitions = num_partitions

def get_partition_id(self, event: TypedEvent) -> int:
"""
Route event to partition based on identity.

Args:
event: TypedEvent to route

Returns:
Partition ID (0 to num_partitions-1)
"""
routing_key = self._get_routing_key(event)
hash_value = hash(routing_key)
# abs() to handle negative hashes (Python's hash() can be negative)
return abs(hash_value) % self.num_partitions

def _get_routing_key(self, event: TypedEvent) -> str:
"""
Get routing key with priority fallback.

Priority:
1. userId (if present)
2. anonymousId (fallback)
3. eventId (final fallback)

Args:
event: TypedEvent

Returns:
Routing key string
"""
if event.user_id:
return f"userId:{event.user_id}"
elif event.anonymous_id:
return f"anonymousId:{event.anonymous_id}"
else:
# Fallback to eventId (rare - most events have identity)
return f"event:{event.event_id}"
220 changes: 220 additions & 0 deletions tests/unit/processing/test_sequencer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
"""Tests for HashSequencer."""

from collections import Counter

import pytest

from eventkit.processing.sequencer import HashSequencer
from eventkit.schema.events import IdentifyEvent, TrackEvent


class TestHashSequencerInit:
"""Tests for HashSequencer initialization."""

def test_default_partition_count(self):
"""Test default partition count is 16."""
sequencer = HashSequencer()
assert sequencer.num_partitions == 16

def test_custom_partition_count(self):
"""Test custom partition count."""
sequencer = HashSequencer(num_partitions=32)
assert sequencer.num_partitions == 32

def test_invalid_partition_count_zero(self):
"""Test that zero partitions raises ValueError."""
with pytest.raises(ValueError, match="num_partitions must be > 0"):
HashSequencer(num_partitions=0)

def test_invalid_partition_count_negative(self):
"""Test that negative partitions raises ValueError."""
with pytest.raises(ValueError, match="num_partitions must be > 0"):
HashSequencer(num_partitions=-1)


class TestHashSequencerConsistency:
"""Tests for routing consistency."""

def test_same_user_id_same_partition(self):
"""Events with same userId always route to same partition."""
sequencer = HashSequencer(num_partitions=16)

event1 = IdentifyEvent(user_id="alice")
event2 = IdentifyEvent(user_id="alice")

partition1 = sequencer.get_partition_id(event1)
partition2 = sequencer.get_partition_id(event2)

assert partition1 == partition2

def test_same_user_id_consistent_across_calls(self):
"""Same userId routes to same partition across 1000 calls."""
sequencer = HashSequencer(num_partitions=16)
event = IdentifyEvent(user_id="alice")

partitions = [sequencer.get_partition_id(event) for _ in range(1000)]

# All calls should return the same partition
assert len(set(partitions)) == 1

def test_same_anonymous_id_same_partition(self):
"""Events with same anonymousId route to same partition."""
sequencer = HashSequencer(num_partitions=16)

event1 = TrackEvent(event_name="click", anonymous_id="anon_123")
event2 = TrackEvent(event_name="view", anonymous_id="anon_123")

partition1 = sequencer.get_partition_id(event1)
partition2 = sequencer.get_partition_id(event2)

assert partition1 == partition2


class TestHashSequencerPartitionRange:
"""Tests for partition ID range validation."""

def test_partition_in_range(self):
"""Partition ID is always 0 <= id < num_partitions."""
sequencer = HashSequencer(num_partitions=16)

for i in range(1000):
event = IdentifyEvent(user_id=f"user{i}")
partition = sequencer.get_partition_id(event)
assert 0 <= partition < 16

def test_partition_in_range_custom_count(self):
"""Partition ID respects custom partition count."""
sequencer = HashSequencer(num_partitions=32)

for i in range(1000):
event = IdentifyEvent(user_id=f"user{i}")
partition = sequencer.get_partition_id(event)
assert 0 <= partition < 32


class TestHashSequencerPriorityFallback:
"""Tests for routing key priority (userId → anonymousId → eventId)."""

def test_user_id_takes_priority(self):
"""userId is used if present, even if anonymousId exists."""
sequencer = HashSequencer(num_partitions=16)

# Same userId, different anonymousId
event1 = IdentifyEvent(user_id="alice", anonymous_id="anon_1")
event2 = IdentifyEvent(user_id="alice", anonymous_id="anon_2")

# Should route to same partition (userId takes priority)
assert sequencer.get_partition_id(event1) == sequencer.get_partition_id(event2)

def test_anonymous_id_fallback(self):
"""anonymousId is used if userId is not present."""
sequencer = HashSequencer(num_partitions=16)

event1 = TrackEvent(event_name="click", anonymous_id="anon_123")
event2 = TrackEvent(event_name="view", anonymous_id="anon_123")

# Should route to same partition (same anonymousId)
assert sequencer.get_partition_id(event1) == sequencer.get_partition_id(event2)

def test_event_id_final_fallback(self):
"""eventId is used if neither userId nor anonymousId present."""
sequencer = HashSequencer(num_partitions=16)

# Create events without userId or anonymousId
event1 = TrackEvent(event_name="click", event_id="evt_123")
event2 = TrackEvent(event_name="view", event_id="evt_123")

# Should route to same partition (same eventId)
partition1 = sequencer.get_partition_id(event1)
partition2 = sequencer.get_partition_id(event2)

assert partition1 == partition2


class TestHashSequencerDistribution:
"""Tests for distribution quality."""

def test_distribution_quality(self):
"""Events distribute roughly evenly across partitions."""
sequencer = HashSequencer(num_partitions=16)

# Generate 10,000 events with different userIds
events = [IdentifyEvent(user_id=f"user{i}") for i in range(10000)]

partitions = [sequencer.get_partition_id(e) for e in events]
counts = Counter(partitions)

# Each partition should get ~625 events (10000 / 16)
# Allow 20% variance (500-750 events per partition)
for partition_id in range(16):
count = counts[partition_id]
assert 500 <= count <= 750, (
f"Partition {partition_id} has {count} events (expected 500-750)"
)

def test_no_odd_even_bias(self):
"""Ensure no bias toward odd or even partitions."""
sequencer = HashSequencer(num_partitions=16)

# Generate 10,000 events
events = [IdentifyEvent(user_id=f"user{i}") for i in range(10000)]

partitions = [sequencer.get_partition_id(e) for e in events]

# Count odd vs even partitions
odd_count = sum(1 for p in partitions if p % 2 == 1)
even_count = sum(1 for p in partitions if p % 2 == 0)

# Should be roughly 50/50
# Allow 10% variance (4500-5500 for each)
assert 4500 <= odd_count <= 5500, f"Odd count: {odd_count} (expected 4500-5500)"
assert 4500 <= even_count <= 5500, f"Even count: {even_count} (expected 4500-5500)"

def test_all_partitions_used(self):
"""Ensure all partitions are used with sufficient events."""
sequencer = HashSequencer(num_partitions=16)

# Generate 10,000 events
events = [IdentifyEvent(user_id=f"user{i}") for i in range(10000)]

partitions = [sequencer.get_partition_id(e) for e in events]
unique_partitions = set(partitions)

# All 16 partitions should be used
assert len(unique_partitions) == 16, (
f"Only {len(unique_partitions)} partitions used (expected 16)"
)


class TestHashSequencerEdgeCases:
"""Tests for edge cases."""

def test_empty_string_user_id(self):
"""Empty string userId falls back to anonymousId."""
sequencer = HashSequencer(num_partitions=16)

event = IdentifyEvent(user_id="", anonymous_id="anon_123")

# Should not raise an error
partition = sequencer.get_partition_id(event)
assert 0 <= partition < 16

def test_very_long_user_id(self):
"""Very long userId is handled correctly."""
sequencer = HashSequencer(num_partitions=16)

long_id = "x" * 10000
event = IdentifyEvent(user_id=long_id)

partition = sequencer.get_partition_id(event)
assert 0 <= partition < 16

def test_unicode_user_id(self):
"""Unicode characters in userId work correctly."""
sequencer = HashSequencer(num_partitions=16)

event1 = IdentifyEvent(user_id="用户123")
event2 = IdentifyEvent(user_id="用户123")

# Same userId should route to same partition
assert sequencer.get_partition_id(event1) == sequencer.get_partition_id(event2)
Loading