Skip to content

Commit 4ee5e0c

Browse files
committed
update
1 parent 1f6ba4d commit 4ee5e0c

File tree

5 files changed

+117
-40
lines changed

5 files changed

+117
-40
lines changed

docs/Go-SDK/go-sdk-guide-zh.md

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,18 @@ if err != nil {
301301

302302
---
303303

304-
## 七、目录结构参考
304+
## 七、高级状态 API(进阶文档)
305305

306-
**go-sdk**
306+
本指南仅覆盖**低阶 go-sdk**(Driver、Context、Store、目录结构)。**高级状态 API**(Codec、ValueState、ListState、MapState、PriorityQueueState、AggregatingState、ReducingState、Keyed\* 工厂与用法)由独立库 **go-sdk-advanced** 提供,完整说明、Codec 约定、构造函数表与示例均在进阶文档中:
307+
308+
- **[Go SDK — 高级状态 API](go-sdk-advanced-state-api-zh.md)**(中文)
309+
- [Go SDK — Advanced State API](go-sdk-advanced-state-api.md)(英文)
310+
311+
---
312+
313+
## 八、目录结构参考
314+
315+
**低阶库 go-sdk**
307316

308317
```text
309318
go-sdk/
@@ -325,4 +334,14 @@ go-sdk/
325334
└── bindings/ # wit-bindgen-go 生成的 Go 代码(make bindings)
326335
```
327336

337+
**高阶库 go-sdk-advanced**(依赖 go-sdk,含 Codec 与全部状态类型):
338+
339+
```text
340+
go-sdk-advanced/
341+
├── go.mod # require go-sdk
342+
├── codec/ # Codec[T]、DefaultCodecFor、内置与 JSON codec
343+
├── structures/ # ValueState、ListState、MapState、PriorityQueue、Aggregating、Reducing
344+
└── keyed/ # Keyed 状态工厂(value、list、map、PQ、aggregating、reducing)
345+
```
346+
328347
更多示例与 SQL 操作见 [examples/go-processor/README.md](../../examples/go-processor/README.md)[SQL CLI 指南](../sql-cli-guide-zh.md)

docs/Python-SDK/python-sdk-advanced-state-api-zh.md

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
# Python SDK — 高级状态 API
2323

24-
本文档介绍 Python SDK 的**高级状态 API**:基于底层 KvStore 的带类型状态抽象(ValueState、ListState、MapState 等),通过 **codec** 序列化,并支持按主键的 **keyed state**设计与 [Go SDK 高级状态 API](../Go-SDK/go-sdk-guide.md#7-advanced-state-api) 对齐。
24+
本文档介绍 Python SDK 的**高级状态 API**:基于底层 KvStore 的带类型状态抽象(ValueState、ListState、MapState 等),通过 **codec** 序列化,并支持按主键的 **keyed state**
2525

2626
**两个独立库:** 高级状态 API 由 **functionstream-api-advanced** 提供,依赖低阶 **functionstream-api**。安装:`pip install functionstream-api functionstream-api-advanced`。使用时从 `fs_api_advanced` 导入 Codec、ValueState、ListState、MapState 等。
2727

@@ -34,7 +34,7 @@
3434

3535
## 1. 概述
3636

37-
当需要结构化状态(单值、列表、Map、优先队列、聚合、归约)而不想手写字节编码或 key 布局时,可使用高级状态 API。创建方式有两种:通过**运行时的 Context**(如使用 functionstream-runtime 时 `ctx.getOrCreateValueState(...)`)或通过状态类型上的**类型级构造方法**(推荐,便于复用,与 Go SDK 用法一致)。
37+
当需要结构化状态(单值、列表、Map、优先队列、聚合、归约)而不想手写字节编码或 key 布局时,可使用高级状态 API。创建方式有两种:通过**运行时的 Context**(如使用 functionstream-runtime 时 `ctx.getOrCreateValueState(...)`)或通过状态类型上的**类型级构造方法**(推荐,便于复用)。
3838

3939
---
4040

@@ -44,7 +44,7 @@
4444

4545
使用 **functionstream-api-advanced** 时,运行时的 Context 实现(如 functionstream-runtime 的 WitContext)会提供 `getOrCreateValueState(store_name, codec)``getOrCreateValueStateAutoCodec(store_name)` 以及 ListState、MapState、PriorityQueueState、AggregatingState、ReducingState 与所有 Keyed\* 工厂的对应方法,内部委托给下面所述的类型级 `from_context` / `from_context_auto_codec`
4646

47-
### 2.2 通过状态类型(推荐,与 Go SDK 一致
47+
### 2.2 通过状态类型(推荐)
4848

4949
每种状态类型和 keyed 工厂提供:
5050

@@ -95,9 +95,15 @@
9595

9696
也可使用 Context 的 `ctx.getOrCreateKeyed*Factory(...)` 方法,其内部会委托给上述构造方法。
9797

98+
### 4.3 KeyedValueState
99+
100+
KeyedValueState 只需 **value codec**,不要求有序。工厂创建状态:`factory.new_keyed_value(primary_key, state_name="")`,得到 `KeyedValueState[V]`。状态方法:`update(value)``value()`(返回 `Optional[V]`)、`clear()`。主键由创建时传入的 `primary_key`(bytes)固定。
101+
98102
---
99103

100-
## 5. 示例:使用 from_context_auto_codec 的 ValueState
104+
## 5. 示例
105+
106+
### 5.1 ValueState(from_context_auto_codec)
101107

102108
**fs_api_advanced** 导入 ValueState(Codec、ListState、MapState 等同此包):
103109

@@ -108,18 +114,41 @@ from fs_api_advanced import ValueState
108114
class CounterProcessor(FSProcessorDriver):
109115
def process(self, ctx: Context, source_id: int, data: bytes):
110116
state = ValueState.from_context_auto_codec(ctx, "my-store")
111-
cur, found = state.value()
112-
if not found or cur is None:
117+
cur = state.value()
118+
if cur is None:
119+
cur = 0
120+
state.update(cur + 1)
121+
ctx.emit(str(cur + 1).encode(), 0)
122+
```
123+
124+
### 5.2 KeyedValueState(keyed 算子)
125+
126+
流按 key 分区时,在 `init` 中创建工厂,在 `process` 中按当前记录的 `primary_key` 取状态,再 `update(value)` / `value()` / `clear()`
127+
128+
```python
129+
from fs_api import FSProcessorDriver, Context
130+
from fs_api_advanced import KeyedValueStateFactory
131+
132+
class KeyedCounterProcessor(FSProcessorDriver):
133+
def init(self, ctx: Context, config: dict):
134+
self._factory = KeyedValueStateFactory.from_context_auto_codec(
135+
ctx, "counters", b"", b"by_key", value_type=int
136+
)
137+
138+
def process(self, ctx: Context, source_id: int, data: bytes):
139+
primary_key = data[:8]
140+
state = self._factory.new_keyed_value(primary_key, "count")
141+
cur = state.value()
142+
if cur is None:
113143
cur = 0
114144
state.update(cur + 1)
115145
ctx.emit(str(cur + 1).encode(), 0)
116146
```
117147

118-
其他状态类型用法相同:按上表使用 `XxxState.from_context(ctx, store_name, ...)``XxxState.from_context_auto_codec(ctx, store_name)`
148+
其他状态类型按上表使用 `XxxState.from_context(ctx, store_name, ...)``XxxState.from_context_auto_codec(ctx, store_name)`
119149

120150
---
121151

122152
## 6. 参见
123153

124154
- [Python SDK 指南](python-sdk-guide-zh.md) — fs_api、fs_client 及 Context/KvStore 基础用法。
125-
- [Go SDK 指南 — 高级状态 API](../Go-SDK/go-sdk-guide.md#7-advanced-state-api) — Go SDK 中的等价 API。

docs/Python-SDK/python-sdk-advanced-state-api.md

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
# Python SDK — Advanced State API
2323

24-
This document describes the **high-level state API** for the Python SDK: typed state abstractions (ValueState, ListState, MapState, etc.) built on top of the low-level KvStore, with serialization via **codecs** and optional **keyed state** per primary key. The design aligns with the [Go SDK Advanced State API](../Go-SDK/go-sdk-guide.md#7-advanced-state-api).
24+
This document describes the **high-level state API** for the Python SDK: typed state abstractions (ValueState, ListState, MapState, etc.) built on top of the low-level KvStore, with serialization via **codecs** and optional **keyed state** per primary key.
2525

2626
**Two separate libraries:** The advanced state API is provided by **functionstream-api-advanced**, which depends on the low-level **functionstream-api**. Install with: `pip install functionstream-api functionstream-api-advanced`. Import Codec, ValueState, ListState, MapState, etc. from `fs_api_advanced`.
2727

@@ -34,7 +34,7 @@ This document describes the **high-level state API** for the Python SDK: typed s
3434

3535
## 1. Overview
3636

37-
Use the advanced state API when you need structured state (single value, list, map, priority queue, aggregation, reduction) without manual byte encoding or key layout. You can create state either from the **runtime Context** (e.g. `ctx.getOrCreateValueState(...)` when using functionstream-runtime) or via **type-level constructors** on the state class (recommended for clarity and reuse, same pattern as the Go SDK).
37+
Use the advanced state API when you need structured state (single value, list, map, priority queue, aggregation, reduction) without manual byte encoding or key layout. You can create state either from the **runtime Context** (e.g. `ctx.getOrCreateValueState(...)` when using functionstream-runtime) or via **type-level constructors** on the state class (recommended for clarity and reuse).
3838

3939
---
4040

@@ -44,7 +44,7 @@ Use the advanced state API when you need structured state (single value, list, m
4444

4545
When using **functionstream-api-advanced**, the runtime Context implementation (e.g. WitContext in functionstream-runtime) provides `getOrCreateValueState(store_name, codec)`, `getOrCreateValueStateAutoCodec(store_name)`, and the same pattern for ListState, MapState, PriorityQueueState, AggregatingState, ReducingState, and all Keyed\* factories; these delegate to the type-level `from_context` / `from_context_auto_codec` methods below.
4646

47-
### 2.2 From the state type (recommended, same as Go SDK)
47+
### 2.2 From the state type (recommended)
4848

4949
Each state type and keyed factory provides:
5050

@@ -95,9 +95,15 @@ All of the above can also be obtained via the corresponding `ctx.getOrCreate*` m
9595

9696
You can also use the corresponding `ctx.getOrCreateKeyed*Factory(...)` methods, which delegate to these constructors.
9797

98+
### 4.3 KeyedValueState
99+
100+
KeyedValueState requires only a **value codec**; no ordered codec. Create state from the factory with `factory.new_keyed_value(primary_key, state_name="")`, yielding `KeyedValueState[V]`. State methods: `update(value)`, `value()` (returns `Optional[V]`), `clear()`. The primary key is fixed at creation time via `primary_key` (bytes).
101+
98102
---
99103

100-
## 5. Example: ValueState with from_context_auto_codec
104+
## 5. Examples
105+
106+
### 5.1 ValueState (from_context_auto_codec)
101107

102108
Import ValueState from **fs_api_advanced** (Codec, ListState, MapState, etc. are in the same package):
103109

@@ -108,8 +114,32 @@ from fs_api_advanced import ValueState
108114
class CounterProcessor(FSProcessorDriver):
109115
def process(self, ctx: Context, source_id: int, data: bytes):
110116
state = ValueState.from_context_auto_codec(ctx, "my-store")
111-
cur, found = state.value()
112-
if not found or cur is None:
117+
cur = state.value()
118+
if cur is None:
119+
cur = 0
120+
state.update(cur + 1)
121+
ctx.emit(str(cur + 1).encode(), 0)
122+
```
123+
124+
### 5.2 KeyedValueState (keyed operator)
125+
126+
When the stream is partitioned by key, create the factory in `init` and obtain state per record’s `primary_key` in `process`, then use `update(value)` / `value()` / `clear()`:
127+
128+
```python
129+
from fs_api import FSProcessorDriver, Context
130+
from fs_api_advanced import KeyedValueStateFactory
131+
132+
class KeyedCounterProcessor(FSProcessorDriver):
133+
def init(self, ctx: Context, config: dict):
134+
self._factory = KeyedValueStateFactory.from_context_auto_codec(
135+
ctx, "counters", b"", b"by_key", value_type=int
136+
)
137+
138+
def process(self, ctx: Context, source_id: int, data: bytes):
139+
primary_key = data[:8]
140+
state = self._factory.new_keyed_value(primary_key, "count")
141+
cur = state.value()
142+
if cur is None:
113143
cur = 0
114144
state.update(cur + 1)
115145
ctx.emit(str(cur + 1).encode(), 0)
@@ -122,4 +152,3 @@ Same pattern for other state types: use `XxxState.from_context(ctx, store_name,
122152
## 6. See also
123153

124154
- [Python SDK Guide](python-sdk-guide.md) — main guide for fs_api, fs_client, and basic Context/KvStore usage.
125-
- [Go SDK Guide — Advanced State API](../Go-SDK/go-sdk-guide.md#7-advanced-state-api) — equivalent API in the Go SDK.

python/functionstream-api-advanced/src/fs_api_advanced/keyed/keyed_value_state.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@
1515
from fs_api.store import ComplexKey, KvError, KvStore
1616
from fs_api_advanced.codec import Codec, PickleCodec, default_codec_for
1717

18-
from ._keyed_common import ensure_ordered_key_codec
19-
20-
K = TypeVar("K")
2118
V = TypeVar("V")
2219

2320

@@ -64,65 +61,68 @@ def from_context_auto_codec(
6461
key_group: bytes,
6562
value_type: Optional[type] = None,
6663
) -> "KeyedValueStateFactory[V]":
67-
"""Create a KeyedValueStateFactory with default codec from context and store name."""
64+
"""Create a KeyedValueStateFactory with default value codec from context and store name."""
6865
store = ctx.getOrCreateKVStore(store_name)
6966
codec = default_codec_for(value_type) if value_type is not None else PickleCodec()
7067
return cls(store, namespace, key_group, codec)
7168

72-
def new_value(self, key_codec: Codec[K]) -> "KeyedValueState[K, V]":
73-
ensure_ordered_key_codec(key_codec, "keyed value")
69+
def new_keyed_value(self, primary_key: bytes, state_name: str = "") -> "KeyedValueState[V]":
7470
return KeyedValueState(
7571
self._store,
7672
self._namespace,
77-
key_codec,
78-
self._value_codec,
7973
self._key_group,
74+
self._value_codec,
75+
primary_key,
76+
state_name,
8077
)
8178

8279

83-
class KeyedValueState(Generic[K, V]):
80+
class KeyedValueState(Generic[V]):
8481
def __init__(
8582
self,
8683
store: KvStore,
8784
namespace: bytes,
88-
key_codec: Codec[K],
89-
value_codec: Codec[V],
9085
key_group: bytes,
86+
value_codec: Codec[V],
87+
primary_key: bytes,
88+
state_name: str = "",
9189
):
9290
if namespace is None:
9391
raise KvError("keyed value state namespace must not be None")
9492
if key_group is None:
9593
raise KvError("keyed value state key_group must not be None")
9694
if value_codec is None:
9795
raise KvError("keyed value state value_codec must not be None")
96+
if primary_key is None:
97+
raise KvError("keyed value state primary_key must not be None")
9898
self._store = store
9999
self._namespace = namespace
100100
self._key_group = key_group
101-
self._key_codec = key_codec
102101
self._value_codec = value_codec
103-
ensure_ordered_key_codec(key_codec, "keyed value")
102+
self._primary_key = primary_key
103+
self._state_name = state_name
104104

105-
def _build_ck(self, key: K) -> ComplexKey:
105+
def _build_ck(self) -> ComplexKey:
106106
return ComplexKey(
107107
key_group=self._key_group,
108-
key=self._key_codec.encode(key),
108+
key=self._primary_key,
109109
namespace=self._namespace,
110-
user_key=b"",
110+
user_key=self._state_name.encode("utf-8") if self._state_name else b"",
111111
)
112112

113-
def set(self, key: K, value: V) -> None:
114-
ck = self._build_ck(key)
113+
def update(self, value: V) -> None:
114+
ck = self._build_ck()
115115
self._store.put(ck, self._value_codec.encode(value))
116116

117-
def get(self, key: K) -> Optional[V]:
118-
ck = self._build_ck(key)
117+
def value(self) -> Optional[V]:
118+
ck = self._build_ck()
119119
raw = self._store.get(ck)
120120
if raw is None:
121121
return None
122122
return self._value_codec.decode(raw)
123123

124-
def delete(self, key: K) -> None:
125-
self._store.delete(self._build_ck(key))
124+
def clear(self) -> None:
125+
self._store.delete(self._build_ck())
126126

127127

128128
__all__ = ["KeyedValueState", "KeyedValueStateFactory"]

python/functionstream-api-advanced/src/fs_api_advanced/structures/value_state.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def from_context_auto_codec(cls, ctx: Any, store_name: str) -> "ValueState[T]":
5353
def update(self, value: T) -> None:
5454
self._store.put(self._ck, self._codec.encode(value))
5555

56-
def value(self) -> Tuple[Optional[T], bool]:
56+
def value(self) -> Optional[T]:
5757
raw = self._store.get(self._ck)
5858
if raw is None:
5959
return (None, False)

0 commit comments

Comments
 (0)