Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
7672547
feat(storage): add GCS dependencies and configuration
prosdev Jan 13, 2026
5f923fb
feat(storage): implement GCSEventStore schema mapping
prosdev Jan 13, 2026
a96a414
feat(storage): implement GCS Parquet writing with retries
prosdev Jan 13, 2026
cfafed4
feat(storage): wire GCSEventStore with adaptive batching
prosdev Jan 13, 2026
2959855
feat(loaders): add WarehouseLoader protocol
prosdev Jan 13, 2026
f48e6db
feat(loaders): implement BigQueryLoader lifecycle
prosdev Jan 13, 2026
b80a20a
feat(loaders): implement file discovery and filtering
prosdev Jan 13, 2026
f0c0ea2
feat(loaders): implement BigQuery batch loading
prosdev Jan 13, 2026
d89050e
feat(loaders): add structured logging with performance metrics
prosdev Jan 13, 2026
562aeb6
feat(loaders): wire BigQueryLoader into FastAPI lifespan
prosdev Jan 13, 2026
6671182
feat(api): add warehouse loader health checks
prosdev Jan 13, 2026
9fbc7b4
feat(scripts): add BigQuery and GCS production scripts
prosdev Jan 13, 2026
32bd21f
test(integration): add GCS and BigQuery loader integration tests
prosdev Jan 13, 2026
1111f73
docs: update documentation for GCS + BigQuery storage
prosdev Jan 13, 2026
d66be39
docs(spec): mark Phase 1-6 tasks complete
prosdev Jan 14, 2026
3eef797
fix: Fix integration tests and remove BigQuery loader integration tests
prosdev Jan 14, 2026
8199644
fix: Mock GCP clients before instantiation in unit tests
prosdev Jan 14, 2026
e13da4d
perf: Optimize CI for faster iteration (4.5x speedup)
prosdev Jan 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ concurrency:
cancel-in-progress: ${{ github.event_name == 'pull_request' }}

jobs:
test:
lint:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

Expand All @@ -22,9 +21,6 @@ jobs:
with:
python-version: "3.12"

- name: Start Firestore and Pub/Sub Emulators
run: docker compose up -d --wait

- name: Install uv
uses: astral-sh/setup-uv@v4
with:
Expand All @@ -46,13 +42,41 @@ jobs:
run: |
uv run mypy src/eventkit

- name: Run tests
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"

- name: Start Firestore and Pub/Sub Emulators
run: docker compose up -d --wait

- name: Install uv
uses: astral-sh/setup-uv@v4
with:
enable-cache: true

- name: Install dependencies
run: |
uv sync --frozen --all-extras

- name: Run unit tests
run: |
uv run pytest tests/unit/ -n auto --dist loadgroup -q --cov=src/eventkit --cov-report=term-missing --cov-report=xml

- name: Run integration tests
env:
FIRESTORE_EMULATOR_HOST: localhost:8080
PUBSUB_EMULATOR_HOST: localhost:8085
STORAGE_EMULATOR_HOST: http://localhost:9023
GCP_PROJECT_ID: test-project
run: |
uv run pytest --cov=src/eventkit --cov-report=term-missing --cov-report=xml
uv run pytest tests/integration/ -q --cov=src/eventkit --cov-append --cov-report=term-missing --cov-report=xml

- name: Upload coverage
uses: codecov/codecov-action@v4
Expand Down
174 changes: 127 additions & 47 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@ eventkit provides these primitives as a composable, type-safe library.
┌─────────────────────────────────────────────────────────────────┐
│ Phase 4: Batching & Storage │
│ │
│ EventLoader → EventStore / ErrorStore → Firestore │
│ EventLoader → EventStore → GCS (Parquet) │
│ ↓ │
│ WarehouseLoader → BigQuery │
│ │
│ • EventLoader: Time & size-based flushing │
│ • EventStore: Subcollections per stream │
│ • EventLoader: Time & size-based flushing (adaptive batching) │
│ • EventStore: Pluggable (GCS, Firestore, custom) │
│ • GCS: Hive-partitioned Parquet files (date=YYYY-MM-DD/) │
│ • WarehouseLoader: Background poller for batch loading │
│ • BigQuery: Query layer with idempotent loads │
│ • ErrorStore: Separate DLQ collection │
│ • Batch writes (500 events max per Firestore batch) │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
Expand Down Expand Up @@ -256,65 +260,141 @@ class EventLoader:

### Phase 4: Storage

#### EventStore
**File:** `src/eventkit/stores/firestore.py`
eventkit supports pluggable storage backends via the `EventStore` protocol. The default is **GCS + BigQuery** for production deployments.

#### GCSEventStore (Default)
**File:** `src/eventkit/stores/gcs.py`

Persists canonical `TypedEvent` objects to Firestore.
Writes events to Google Cloud Storage as Parquet files, then loads to BigQuery via a background loader.

**Key Design Decisions:**

1. **Subcollections per stream**
1. **Hive-style partitioning**
```
events/
{stream}/
events/
{event_id}
gs://my-events/
date=2026-01-13/
{uuid1}.parquet
{uuid2}.parquet
date=2026-01-14/
{uuid3}.parquet
```
**Why:** Stream isolation, independent scaling, simpler queries.
**Why:** Efficient BigQuery loading, cost-effective lifecycle management.

2. **Async wrappers with `asyncio.to_thread()`**
- Firestore Python client is synchronous
- Use thread pool to avoid blocking event loop
- Pragmatic choice over reimplementing async client
2. **Wide schema (sparse columns)**
- Single Parquet file with all event type fields
- Nullable columns for type-specific fields (e.g., `event_name` only for track events)
- Parquet handles sparse data efficiently
- Simpler queries than separate tables per event type

3. **Retry logic with `tenacity`**
- Exponential backoff for transient failures
- Max 3 retries per operation
- Fails fast on non-retriable errors
3. **Pandas → Parquet → GCS**
- Convert events to DataFrame for columnar representation
- Serialize to Parquet with PyArrow
- Upload to GCS with retry logic

4. **Batch writes (500 event limit)**
- Firestore batch limit: 500 operations
- Automatically chunk larger batches
4. **Retry logic with `tenacity`**
- Exponential backoff for transient GCS failures
- Max 3 retries per operation

**Code Pattern:**
```python
class FirestoreEventStore:
class GCSEventStore:
async def store_batch(self, events: list[TypedEvent]) -> None:
await asyncio.to_thread(self._sync_store_batch, events)

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type(ServiceUnavailable)
)
def _sync_store_batch(self, events: list[TypedEvent]) -> None:
# Chunk into Firestore batch size limit
for chunk in self._chunk_events(events, 500):
batch = self.db.batch()
for event in chunk:
doc_ref = self._get_doc_ref(event)
batch.set(doc_ref, self._event_to_dict(event))
batch.commit()
# Group by date for partitioning
by_date = defaultdict(list)
for event in events:
date = event.timestamp.date()
by_date[date].append(event)

# Write one file per date
for date, day_events in by_date.items():
df = self._events_to_dataframe(day_events)
path = self._generate_path(date) # date=YYYY-MM-DD/{uuid}.parquet
await self._write_parquet(df, path)
```

**Why GCS + BigQuery?**
- **Cost**: GCS Standard ($0.020/GB/month) → BigQuery long-term ($0.010/GB/month)
- **Flexibility**: Raw events for reprocessing, custom pipelines
- **Scalability**: Proven at Petabyte scale (PostHog, RudderStack, Snowplow)
- **Queryability**: BigQuery's SQL engine for analytics
- **Pluggable**: Easy to add Snowflake, Redshift, etc. via `WarehouseLoader` protocol

---

#### BigQueryLoader
**File:** `src/eventkit/loaders/bigquery_loader.py`

Background service that polls GCS for new Parquet files and loads them to BigQuery.

**Key Responsibilities:**
1. **Poll GCS** - List new `.parquet` files every 5 minutes (configurable)
2. **Filter loaded** - Query `_loaded_files` metadata table to skip duplicates
3. **Batch load** - Create BigQuery load jobs from GCS URIs
4. **Track metadata** - Record loaded files for idempotency

**Lifecycle:**
```python
loader = BigQueryLoader(bucket, dataset, table, project_id, poll_interval=300.0)
await loader.start() # Runs in background
# ... application runs ...
await loader.stop() # Graceful shutdown
```

**Why separate service?**
- **Independent scaling**: API and loader scale independently
- **Latency tolerance**: Batch loading accepts 5-10 minute delay
- **Resource isolation**: Loading doesn't impact API performance
- **Deployment flexibility**: Run as Cloud Run scheduled job, Kubernetes CronJob, or embedded

**Idempotency:**
- Metadata table tracks loaded files: `_loaded_files(file_path, loaded_at, row_count)`
- Prevents duplicate loads if loader restarts

---

#### WarehouseLoader Protocol
**File:** `src/eventkit/loaders/warehouse_loader.py`

Pluggable protocol for loading events to different data warehouses.

```python
class WarehouseLoader(Protocol):
async def start(self) -> None:
"""Start background loading process."""

async def stop(self) -> None:
"""Stop background loading process."""

async def load_files(self, file_paths: list[str]) -> None:
"""Load specific files (for manual triggering)."""
```

**Why Firestore?**
- Serverless (no cluster management)
- Strong consistency
- Good for moderate throughput (10K events/sec per stream)
- Free tier for development
- GCP-native (aligns with Cloud Run deployment)
**Implementations:**
- `BigQueryLoader` - Default GCS → BigQuery
- **Future:** `SnowflakeLoader`, `RedshiftLoader`, `ClickHouseLoader`

**Why protocol-based?**
- Same interface for all warehouses
- Users bring their own warehouse
- Easy to test (mock loaders)

---

#### FirestoreEventStore (Development/Testing)
**File:** `src/eventkit/stores/firestore.py`

Managed NoSQL storage for development and testing environments.

**Why Firestore for dev?**
- Emulator support (no GCP account needed)
- Fast local development
- Good for moderate throughput
- Free tier

**Future:** Pluggable backends (ClickHouse for analytics, BigQuery for data warehouse).
**Not recommended for production analytics** due to:
- Higher costs at scale
- Limited query capabilities (no SQL)
- Not designed for analytical workloads

---

Expand Down
48 changes: 43 additions & 5 deletions LOCAL_DEV.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@ docker compose up -d
```

This starts:
- **Firestore emulator** on `localhost:8080` (for event/error storage)
- **Firestore emulator** on `localhost:8080` (for Firestore storage mode)
- **Pub/Sub emulator** on `localhost:8085` (for distributed queue mode)

**For GCS + BigQuery mode**, you can run a GCS emulator:

```bash
docker run -d -p 9023:9023 --name gcs-emulator \
fsouza/fake-gcs-server -scheme http
export STORAGE_EMULATOR_HOST=http://localhost:9023
```

See `tests/integration/README.md` for full emulator setup.

### 2. Install Dependencies

```bash
Expand All @@ -26,22 +36,40 @@ uv sync

### 3. Run the API Server

**Option A: Async Queue Mode (default, in-process workers + ring buffer)**
**Option A: GCS + BigQuery Mode (production pattern with emulator)**
```bash
export STORAGE_EMULATOR_HOST="http://localhost:9023"
export GCP_PROJECT_ID="test-project"
export GCP_GCS_BUCKET="test-events"
export GCP_BIGQUERY_DATASET="events"
export GCP_BIGQUERY_TABLE="raw_events"
export EVENTKIT_EVENT_STORE="gcs"
export EVENTKIT_WAREHOUSE_ENABLED="true"
export EVENTKIT_QUEUE_MODE="async"
export EVENTKIT_ASYNC_WORKERS="4"
export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db"

uv run uvicorn eventkit.api.app:app --reload --port 8000
```

**Option B: Firestore Mode (fast local development)**
```bash
export FIRESTORE_EMULATOR_HOST="localhost:8080"
export GCP_PROJECT_ID="test-project"
export EVENTKIT_EVENT_STORE="firestore"
export EVENTKIT_QUEUE_MODE="async"
export EVENTKIT_ASYNC_WORKERS="4"
export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db"

uv run uvicorn eventkit.api.app:app --reload --port 8000
```

**Option B: Pub/Sub Queue Mode (distributed workers + ring buffer)**
**Option C: Pub/Sub Queue Mode (distributed workers)**
```bash
export FIRESTORE_EMULATOR_HOST="localhost:8080"
export PUBSUB_EMULATOR_HOST="localhost:8085"
export GCP_PROJECT_ID="test-project"
export EVENTKIT_EVENT_STORE="firestore"
export EVENTKIT_QUEUE_MODE="pubsub"
export EVENTKIT_PUBSUB_WORKERS="4"
export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db"
Expand Down Expand Up @@ -126,6 +154,14 @@ See `src/eventkit/config.py` for all available settings.
| `GCP_PROJECT_ID` | *required* | GCP project ID |
| `FIRESTORE_EMULATOR_HOST` | - | Firestore emulator address (e.g., `localhost:8080`) |
| `PUBSUB_EMULATOR_HOST` | - | Pub/Sub emulator address (e.g., `localhost:8085`) |
| `STORAGE_EMULATOR_HOST` | - | GCS emulator address (e.g., `http://localhost:9023`) |
| **Storage Mode** |||
| `EVENTKIT_EVENT_STORE` | `"gcs"` | Storage backend: `gcs`, `firestore` |
| `GCP_GCS_BUCKET` | *required for GCS* | GCS bucket name for event storage |
| `GCP_BIGQUERY_DATASET` | *required for GCS* | BigQuery dataset name |
| `GCP_BIGQUERY_TABLE` | *required for GCS* | BigQuery table name |
| `EVENTKIT_WAREHOUSE_ENABLED` | `true` | Enable background warehouse loader |
| `EVENTKIT_WAREHOUSE_LOADER_INTERVAL` | `300.0` | Seconds between loader polls (5 min) |
| **Queue Mode** |||
| `EVENTKIT_QUEUE_MODE` | `"async"` | Queue mode: `async`, `pubsub` |
| `EVENTKIT_ASYNC_WORKERS` | `4` | Number of async workers (async mode) |
Expand All @@ -139,9 +175,11 @@ See `src/eventkit/config.py` for all available settings.
| `EVENTKIT_RING_BUFFER_PUBLISHER_POLL_INTERVAL` | `0.1` | Seconds between ring buffer polls |
| `EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL` | `3600.0` | Seconds between cleanup runs (1 hour) |
| **EventLoader** |||
| `EVENTKIT_BUFFER_SIZE` | `100` | Events per partition before flush |
| `EVENTKIT_EVENTLOADER_BATCH_SIZE` | *adaptive* | Events per batch (1000 for GCS, 100 for Firestore) |
| `EVENTKIT_EVENTLOADER_FLUSH_INTERVAL` | *adaptive* | Flush interval seconds (60 for GCS, 5 for Firestore) |
| `EVENTKIT_BUFFER_SIZE` | `100` | Events per partition before flush (deprecated) |
| `EVENTKIT_BUFFER_MAX_SIZE` | `1000` | Hard limit per partition |
| `EVENTKIT_BUFFER_TIMEOUT` | `5.0` | Max seconds before flush |
| `EVENTKIT_BUFFER_TIMEOUT` | `5.0` | Max seconds before flush (deprecated) |

### Ring Buffer (Durability Layer)

Expand Down
Loading
Loading