Skip to content

Conversation

@prosdev
Copy link
Contributor

@prosdev prosdev commented Jan 11, 2026

What This PR Does

Implements Issue #5 (Buffer) with per-partition buffering, size-based and time-based flushing, and pluggable storage backends.

Key Features

  • Pluggable Storage: BufferStorage protocol enables in-memory (default), disk, or Redis backends
  • Per-Partition Isolation: Each partition has its own buffer for isolation and ordering
  • Size-Based Flush: Automatically flushes when buffer reaches configured size (default: 100 events)
  • Time-Based Flush: Background task flushes all partitions every timeout seconds (default: 5s)
  • Graceful Shutdown: stop_flusher() ensures no events are lost on shutdown
  • Defensive Max Size: Prevents OOM with max_size limit (default: 1000 events/partition)
  • Retry-Flush Strategy: Attempts one flush before raising BufferFullError (production CDP pattern)

Commits

  1. feat(buffer): add Buffer with pluggable storage and retry-flush

    • Create BufferStorage protocol and InMemoryBufferStorage
    • Implement Buffer class with enqueue() and size-based flush
    • Add retry-flush-before-failing strategy
    • Add BufferFullError for defensive max_size limit
    • Comprehensive tests (97% coverage on buffer.py, 95% on buffer_storage.py)
  2. feat(buffer): add time-based flush and graceful shutdown

    • Implement start_flusher() for background task
    • Implement _run_flusher() for periodic flushing
    • Implement flush_all() to flush all partitions
    • Implement stop_flusher() with graceful shutdown (no data loss)
    • Add tests for time-based flush, shutdown, and concurrency
    • All 11 tests passing (96% coverage)
  3. docs: mark Task 8 complete in tasks.md

    • Update tasks.md to reflect completion

Test Coverage

  • 11 unit tests passing
  • 96% coverage on buffer.py
  • 100% coverage on buffer_storage.py
  • Tests cover: initialization, enqueue, size-based flush, time-based flush, graceful shutdown, concurrency, error handling, per-partition isolation

Design Decisions

See notes/projects/eventkit-impl/005-buffer-batching.md for detailed analysis:

  • Why per-partition buffers (vs. single buffer)
  • Why asyncio.Lock is necessary
  • Why background task for time-based flush
  • Pluggable storage design
  • Max buffer size strategy

Closes #5

- Create BufferStorage protocol for pluggable backends
- Implement InMemoryBufferStorage (default, fast, volatile)
- Create Buffer class with per-partition buffering
- Implement enqueue() with size-based flush (default: 100 events)
- Add retry-flush-before-failing strategy
  - If buffer at max_size, try flushing first
  - Only raise BufferFullError if flush fails
- Add BufferFullError for defensive max_size limit (default: 1000)
- Add asyncio.Lock to prevent race conditions
- Add comprehensive tests for enqueue, flush, isolation, and error cases
- Coverage: 97% on buffer.py, 95% on buffer_storage.py

Design decisions:
- Pluggable storage enables disk, Redis, or custom backends
- Retry-flush follows production CDP patterns (fail fast with one retry)
- Protocol-based design matches EventStore and Sequencer patterns
- Implement start_flusher() to start background task
- Implement _run_flusher() for periodic flushing every timeout seconds
- Implement flush_all() to flush all partitions
- Implement stop_flusher() with graceful shutdown
  - Signals shutdown
  - Waits for flusher task (with timeout)
  - Final flush to ensure no data loss
- Add comprehensive tests for time-based flush, shutdown, and concurrency
- Coverage: 96% on buffer.py, 100% on buffer_storage.py

All 11 tests passing
All acceptance criteria met:
- Per-partition buffers with pluggable storage
- Size-based flush (default: 100 events)
- Time-based flush (default: 5 seconds)
- Async background flusher task
- Graceful shutdown flushes all buffers
- Comprehensive unit tests (11 tests, 96% coverage)
@prosdev prosdev merged commit 1b69e26 into main Jan 11, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Buffer (Batching)

2 participants