Skip to content

[Feature] Unified Local State Storage Backend for Stateful Operators #220

@luoluoyuyu

Description

@luoluoyuyu

📝 Background & Motivation

Currently, stateful operators in FunctionStream (like Windowing, Joins, and Aggregations) rely entirely on internal, pure in-memory structures. As data volumes and business complexities grow, this approach exposes severe limitations:

  • High OOM Risk: Unbounded memory growth during long windows or traffic spikes.
  • No Spill-to-Disk: Inability to temporarily offload cold data to disk when memory is low.
  • Difficult Checkpointing: In-memory intermediate states are hard to serialize, hindering true fault tolerance.
  • High GC Pressure: Frequent creation and deletion of small data batches cause severe memory fragmentation.

We need a unified, memory-controlled local state engine to completely take over state management, ensuring system stability and enabling large-scale data processing.


✨ Key Features & Capabilities

1. Tiered Storage & Anti-OOM Mechanism

The engine will introduce a global memory controller. It will automatically manage data across multiple tiers:

  • Keep active and frequently accessed data in memory.
  • Automatically and transparently spill older, inactive data to disk (as Parquet files) when memory reaches predefined safety thresholds, completely preventing Out-Of-Memory crashes.

2. Unified Operator Abstraction

Provides a standardized read/write interface for all stateful operators. Operators will no longer manage their own complex data maps; they will simply read from and write to the state engine using a unified indexing standard.

3. Native Arrow & Parquet Performance

State data will remain in standard Arrow formats in memory and Parquet formats on disk. This ensures zero conversion overhead and blazing-fast read/write speeds.

4. Tombstone Deletion Mechanism

To handle rapid state cleanups (e.g., when a time window closes), the engine will use a logical "Tombstone" deletion mechanism. Instead of performing expensive disk rewrites, expired data is instantly marked as deleted and filtered out during subsequent reads.

5. Lightweight Checkpointing (No WAL)

Relies on Epoch-based snapshots to synchronize memory and disk states, avoiding the heavy write penalties associated with traditional Write-Ahead Logs (WAL).


📋 Implementation Roadmap

We plan to roll out this feature in 4 high-level phases:

  • Phase 1: Core Engine & Memory Control
    Build the global memory controller, the unified state indexing system, and the tombstone registry for logical deletions.

  • Phase 2: Disk I/O & Serialization
    Implement the logic to seamlessly serialize in-memory data to Parquet files and asynchronously load it back when requested.

  • Phase 3: Operator Migration
    Refactor existing stateful operators (Joins, Windows, Aggregations) to drop their internal buffers and fully integrate with the new state engine.

  • Phase 4: Background Cleanup (Compaction)
    Implement background tasks to merge fragmented historical disk files and physically permanently delete data marked by tombstones.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions