Skip to content

Commit 7090143

Browse files
committed
update
1 parent 44441d7 commit 7090143

File tree

6 files changed

+27
-19
lines changed

6 files changed

+27
-19
lines changed

python/functionstream-api-advanced/src/fs_api_advanced/keyed/keyed_aggregating_state.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from typing import Any, Generic, Optional, Protocol, Tuple, TypeVar
1414

1515
from fs_api.store import ComplexKey, KvError, KvStore
16-
from fs_api_advanced.codec import Codec, PickleCodec, default_codec_for
16+
from fs_api_advanced.codec import Codec, default_codec_for
1717

1818
T_agg = TypeVar("T_agg")
1919
ACC = TypeVar("ACC")
@@ -74,7 +74,9 @@ def from_context_auto_codec(
7474
) -> "KeyedAggregatingStateFactory[T_agg, ACC, R]":
7575
"""Create a KeyedAggregatingStateFactory with default accumulator codec from context and store name."""
7676
store = ctx.getOrCreateKVStore(store_name)
77-
codec = default_codec_for(acc_type) if acc_type is not None else PickleCodec()
77+
if acc_type is None:
78+
raise KvError("keyed aggregating state from_context_auto_codec requires acc_type")
79+
codec = default_codec_for(acc_type)
7880
return cls(store, key_group, codec, agg_func)
7981

8082
def new_aggregating_state(

python/functionstream-api-advanced/src/fs_api_advanced/keyed/keyed_list_state.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from typing import Any, Generic, List, Optional, TypeVar
1515

1616
from fs_api.store import ComplexKey, KvError, KvStore
17-
from fs_api_advanced.codec import Codec, PickleCodec, default_codec_for
17+
from fs_api_advanced.codec import Codec, default_codec_for
1818

1919
V = TypeVar("V")
2020

@@ -67,7 +67,9 @@ def from_context_auto_codec(
6767
) -> "KeyedListStateFactory[V]":
6868
"""Create a KeyedListStateFactory with default value codec from context and store name."""
6969
store = ctx.getOrCreateKVStore(store_name)
70-
codec = default_codec_for(value_type) if value_type is not None else PickleCodec()
70+
if value_type is None:
71+
raise KvError("keyed list state from_context_auto_codec requires value_type")
72+
codec = default_codec_for(value_type)
7173
return cls(store, key_group, codec)
7274

7375
def new_keyed_list(self, key: bytes, namespace: bytes) -> "KeyedListState[V]":

python/functionstream-api-advanced/src/fs_api_advanced/keyed/keyed_map_state.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from typing import Any, Generic, Iterator, Optional, Tuple, TypeVar
1414

1515
from fs_api.store import ComplexKey, KvError, KvStore
16-
from fs_api_advanced.codec import Codec, PickleCodec, BytesCodec, default_codec_for
16+
from fs_api_advanced.codec import Codec, default_codec_for
1717

1818
from ._keyed_common import ensure_ordered_key_codec
1919

@@ -69,14 +69,12 @@ def from_context_auto_codec(
6969
) -> "KeyedMapStateFactory[MK, MV]":
7070
"""Create a KeyedMapStateFactory with default codecs for MK and MV. Map key type must have an ordered default codec."""
7171
store = ctx.getOrCreateKVStore(store_name)
72-
map_key_codec = (
73-
default_codec_for(map_key_type) if map_key_type is not None else BytesCodec()
74-
)
75-
map_value_codec = (
76-
default_codec_for(map_value_type)
77-
if map_value_type is not None
78-
else PickleCodec()
79-
)
72+
if map_key_type is None:
73+
raise KvError("keyed map state from_context_auto_codec requires map_key_type")
74+
if map_value_type is None:
75+
raise KvError("keyed map state from_context_auto_codec requires map_value_type")
76+
map_key_codec = default_codec_for(map_key_type)
77+
map_value_codec = default_codec_for(map_value_type)
8078
return cls(store, key_group, map_key_codec, map_value_codec)
8179

8280
def new_keyed_map(

python/functionstream-api-advanced/src/fs_api_advanced/keyed/keyed_priority_queue_state.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from typing import Any, Generic, Iterator, Optional, Tuple, TypeVar
1414

1515
from fs_api.store import ComplexKey, KvError, KvStore
16-
from fs_api_advanced.codec import Codec, IntCodec, default_codec_for
16+
from fs_api_advanced.codec import Codec, default_codec_for
1717

1818
from ._keyed_common import ensure_ordered_key_codec
1919

@@ -62,7 +62,9 @@ def from_context_auto_codec(
6262
) -> "KeyedPriorityQueueStateFactory[V]":
6363
"""Create a KeyedPriorityQueueStateFactory with default value codec. V must have an ordered default codec."""
6464
store = ctx.getOrCreateKVStore(store_name)
65-
codec = default_codec_for(item_type) if item_type is not None else IntCodec()
65+
if item_type is None:
66+
raise KvError("keyed priority queue state from_context_auto_codec requires item_type")
67+
codec = default_codec_for(item_type)
6668
return cls(store, key_group, codec)
6769

6870
def new_keyed_priority_queue(

python/functionstream-api-advanced/src/fs_api_advanced/keyed/keyed_reducing_state.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from typing import Any, Callable, Generic, Optional, Tuple, TypeVar
1414

1515
from fs_api.store import ComplexKey, KvError, KvStore
16-
from fs_api_advanced.codec import Codec, PickleCodec, default_codec_for
16+
from fs_api_advanced.codec import Codec, default_codec_for
1717

1818
V = TypeVar("V")
1919

@@ -68,7 +68,9 @@ def from_context_auto_codec(
6868
) -> "KeyedReducingStateFactory[V]":
6969
"""Create a KeyedReducingStateFactory with default value codec from context and store name."""
7070
store = ctx.getOrCreateKVStore(store_name)
71-
codec = default_codec_for(value_type) if value_type is not None else PickleCodec()
71+
if value_type is None:
72+
raise KvError("keyed reducing state from_context_auto_codec requires value_type")
73+
codec = default_codec_for(value_type)
7274
return cls(store, key_group, codec, reduce_func)
7375

7476
def new_reducing_state(

python/functionstream-api-advanced/src/fs_api_advanced/keyed/keyed_value_state.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from typing import Any, Generic, Optional, Tuple, TypeVar
1414

1515
from fs_api.store import ComplexKey, KvError, KvStore
16-
from fs_api_advanced.codec import Codec, PickleCodec, default_codec_for
16+
from fs_api_advanced.codec import Codec, default_codec_for
1717

1818
V = TypeVar("V")
1919

@@ -59,7 +59,9 @@ def from_context_auto_codec(
5959
) -> "KeyedValueStateFactory[V]":
6060
"""Create a KeyedValueStateFactory with default value codec from context and store name."""
6161
store = ctx.getOrCreateKVStore(store_name)
62-
codec = default_codec_for(value_type) if value_type is not None else PickleCodec()
62+
if value_type is None:
63+
raise KvError("keyed value state from_context_auto_codec requires value_type")
64+
codec = default_codec_for(value_type)
6365
return cls(store, key_group, codec)
6466

6567

0 commit comments

Comments
 (0)