Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions pyignite/aio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
from .queries.query import CacheInfo
from .stream import AioBinaryStream, READ_BACKWARD
from .utils import cache_id, entity_id, status_to_exception, is_wrapped
from .utils import cache_id, entity_id, status_to_exception


__all__ = ['AioClient']
Expand Down Expand Up @@ -269,11 +269,24 @@ async def unwrap_binary(self, value: Any) -> Any:
:return: the result of the Binary Object unwrapping with all other data
left intact.
"""
if is_wrapped(value):
blob, offset = value
with AioBinaryStream(self, blob) as stream:
data_class = await BinaryObject.parse_async(stream)
return await BinaryObject.to_python_async(stream.read_ctype(data_class, direction=READ_BACKWARD), self)
if isinstance(value, tuple) and len(value) == 2:
if type(value[0]) is bytes and type(value[1]) is int:
blob, offset = value
with AioBinaryStream(self, blob) as stream:
data_class = await BinaryObject.parse_async(stream)
return await BinaryObject.to_python_async(stream.read_ctype(data_class, direction=READ_BACKWARD),
client=self)

if isinstance(value[0], int):
col_type, collection = value
if isinstance(collection, list):
coros = [self.unwrap_binary(v) for v in collection]
return col_type, await asyncio.gather(*coros)

if isinstance(collection, dict):
coros = [asyncio.gather(self.unwrap_binary(k), self.unwrap_binary(v))
for k, v in collection.items()]
return col_type, dict(await asyncio.gather(*coros))
return value

@status_to_exception(CacheError)
Expand Down Expand Up @@ -351,7 +364,7 @@ async def get_best_node(

key, key_hint = self._get_affinity_key(c_id, key, key_hint)

hashcode = await key_hint.hashcode_async(key, self)
hashcode = await key_hint.hashcode_async(key, client=self)

best_node = self._get_node_by_hashcode(c_id, hashcode, parts)
if best_node:
Expand Down
27 changes: 18 additions & 9 deletions pyignite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
from .queries.query import CacheInfo
from .stream import BinaryStream, READ_BACKWARD
from .utils import (
cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable, is_wrapped,
cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable,
get_field_by_id, unsigned
)
from .binary import GenericObjectMeta
Expand Down Expand Up @@ -539,17 +539,26 @@ def query_binary_type(self, binary_type: Union[int, str], schema: Union[int, dic

def unwrap_binary(self, value: Any) -> Any:
"""
Detects and recursively unwraps Binary Object.
Detects and recursively unwraps Binary Object or collections of BinaryObject.

:param value: anything that could be a Binary Object,
:param value: anything that could be a Binary Object or collection of BinaryObject,
:return: the result of the Binary Object unwrapping with all other data
left intact.
"""
if is_wrapped(value):
blob, offset = value
with BinaryStream(self, blob) as stream:
data_class = BinaryObject.parse(stream)
return BinaryObject.to_python(stream.read_ctype(data_class, direction=READ_BACKWARD), self)
if isinstance(value, tuple) and len(value) == 2:
if type(value[0]) is bytes and type(value[1]) is int:
blob, offset = value
with BinaryStream(self, blob) as stream:
data_class = BinaryObject.parse(stream)
return BinaryObject.to_python(stream.read_ctype(data_class, direction=READ_BACKWARD), client=self)

if isinstance(value[0], int):
col_type, collection = value
if isinstance(collection, list):
return col_type, [self.unwrap_binary(v) for v in collection]

if isinstance(collection, dict):
return col_type, {self.unwrap_binary(k): self.unwrap_binary(v) for k, v in collection.items()}
return value

@status_to_exception(CacheError)
Expand Down Expand Up @@ -619,7 +628,7 @@ def get_best_node(
return conn

key, key_hint = self._get_affinity_key(c_id, key, key_hint)
hashcode = key_hint.hashcode(key, self)
hashcode = key_hint.hashcode(key, client=self)

best_node = self._get_node_by_hashcode(c_id, hashcode, parts)
if best_node:
Expand Down
12 changes: 6 additions & 6 deletions pyignite/datatypes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ class IgniteDataType(metaclass=IgniteDataTypeMeta):
classes, both object and payload varieties.
"""
@classmethod
async def hashcode_async(cls, value, *args, **kwargs):
return cls.hashcode(value, *args, **kwargs)
async def hashcode_async(cls, value, **kwargs):
return cls.hashcode(value, **kwargs)

@classmethod
def hashcode(cls, value, *args, **kwargs):
def hashcode(cls, value, **kwargs):
return 0

@classmethod
Expand All @@ -72,9 +72,9 @@ async def from_python_async(cls, stream, value, **kwargs):
cls.from_python(stream, value, **kwargs)

@classmethod
def to_python(cls, ctypes_object, *args, **kwargs):
def to_python(cls, ctypes_object, **kwargs):
raise NotImplementedError

@classmethod
async def to_python_async(cls, ctypes_object, *args, **kwargs):
return cls.to_python(ctypes_object, *args, **kwargs)
async def to_python_async(cls, ctypes_object, **kwargs):
return cls.to_python(ctypes_object, **kwargs)
12 changes: 6 additions & 6 deletions pyignite/datatypes/cache_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ async def parse_async(cls, stream):
return cls.parse(stream)

@classmethod
def to_python(cls, ctypes_object, *args, **kwargs):
return cls.prop_data_class.to_python(ctypes_object.data, *args, **kwargs)
def to_python(cls, ctypes_object, **kwargs):
return cls.prop_data_class.to_python(ctypes_object.data, **kwargs)

@classmethod
async def to_python_async(cls, ctypes_object, *args, **kwargs):
return cls.to_python(ctypes_object, *args, **kwargs)
async def to_python_async(cls, ctypes_object, **kwargs):
return cls.to_python(ctypes_object, **kwargs)

@classmethod
def from_python(cls, stream, value):
Expand Down Expand Up @@ -302,6 +302,6 @@ def from_python(cls, stream, value):
)

@classmethod
def to_python(cls, ctypes_object, *args, **kwargs):
def to_python(cls, ctypes_object, **kwargs):
prop_data_class = prop_map(ctypes_object.prop_code)
return prop_data_class.to_python(ctypes_object.data, *args, **kwargs)
return prop_data_class.to_python(ctypes_object.data, **kwargs)
66 changes: 26 additions & 40 deletions pyignite/datatypes/complex.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,21 @@ def __build_final_class(cls, fields):
)

@classmethod
def to_python_not_null(cls, ctypes_object, *args, **kwargs):
def to_python_not_null(cls, ctypes_object, **kwargs):
result = []
for i in range(ctypes_object.length):
result.append(
AnyDataObject.to_python(
getattr(ctypes_object, f'element_{i}'),
*args, **kwargs
getattr(ctypes_object, f'element_{i}'), **kwargs
)
)
return ctypes_object.type_id, result

@classmethod
async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
async def to_python_not_null_async(cls, ctypes_object, **kwargs):
result = [
await AnyDataObject.to_python_async(
getattr(ctypes_object, f'element_{i}'), *args, **kwargs
getattr(ctypes_object, f'element_{i}'), **kwargs
)
for i in range(ctypes_object.length)]
return ctypes_object.type_id, result
Expand Down Expand Up @@ -223,8 +222,6 @@ class CollectionObject(Nullable):
_type_id = TYPE_COL
_header_class = None
type_code = TC_COLLECTION
pythonic = list
default = []

@classmethod
def parse_not_null(cls, stream):
Expand Down Expand Up @@ -271,15 +268,15 @@ def __build_final_class(cls, fields):
@classmethod
def to_python_not_null(cls, ctypes_object, *args, **kwargs):
result = [
AnyDataObject.to_python(getattr(ctypes_object, f'element_{i}'), *args, **kwargs)
AnyDataObject.to_python(getattr(ctypes_object, f'element_{i}'), **kwargs)
for i in range(ctypes_object.length)
]
return ctypes_object.type, result

@classmethod
async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
result_coro = [
AnyDataObject.to_python_async(getattr(ctypes_object, f'element_{i}'), *args, **kwargs)
AnyDataObject.to_python_async(getattr(ctypes_object, f'element_{i}'), **kwargs)
for i in range(ctypes_object.length)
]

Expand Down Expand Up @@ -361,35 +358,27 @@ def __build_final_class(cls, fields):
)

@classmethod
def _to_python(cls, ctypes_object, *args, **kwargs):
def _to_python(cls, ctypes_object, **kwargs):
map_cls = cls.__get_map_class(ctypes_object)

result = map_cls()
for i in range(0, ctypes_object.length << 1, 2):
k = AnyDataObject.to_python(
getattr(ctypes_object, f'element_{i}'),
*args, **kwargs
)
v = AnyDataObject.to_python(
getattr(ctypes_object, f'element_{i + 1}'),
*args, **kwargs
)
k = AnyDataObject.to_python(getattr(ctypes_object, f'element_{i}'), **kwargs)
v = AnyDataObject.to_python(getattr(ctypes_object, f'element_{i + 1}'), **kwargs)
result[k] = v
return result

@classmethod
async def _to_python_async(cls, ctypes_object, *args, **kwargs):
async def _to_python_async(cls, ctypes_object, **kwargs):
map_cls = cls.__get_map_class(ctypes_object)

kv_pairs_coro = [
asyncio.gather(
AnyDataObject.to_python_async(
getattr(ctypes_object, f'element_{i}'),
*args, **kwargs
getattr(ctypes_object, f'element_{i}'), **kwargs
),
AnyDataObject.to_python_async(
getattr(ctypes_object, f'element_{i + 1}'),
*args, **kwargs
getattr(ctypes_object, f'element_{i + 1}'), **kwargs
)
) for i in range(0, ctypes_object.length << 1, 2)
]
Expand Down Expand Up @@ -449,12 +438,12 @@ def _parse_header(cls, stream):
return [('length', ctypes.c_int)], length

@classmethod
def to_python(cls, ctypes_object, *args, **kwargs):
return cls._to_python(ctypes_object, *args, **kwargs)
def to_python(cls, ctypes_object, **kwargs):
return cls._to_python(ctypes_object, **kwargs)

@classmethod
async def to_python_async(cls, ctypes_object, *args, **kwargs):
return await cls._to_python_async(ctypes_object, *args, **kwargs)
async def to_python_async(cls, ctypes_object, **kwargs):
return await cls._to_python_async(ctypes_object, **kwargs)

@classmethod
def from_python(cls, stream, value, type_id=None):
Expand Down Expand Up @@ -484,8 +473,6 @@ class MapObject(Nullable, _MapBase):
_type_name = NAME_MAP
_type_id = TYPE_MAP
type_code = TC_MAP
pythonic = dict
default = {}

@classmethod
def parse_not_null(cls, stream):
Expand All @@ -507,12 +494,12 @@ def _parse_header(cls, stream):
return fields, length

@classmethod
def to_python_not_null(cls, ctypes_object, *args, **kwargs):
return ctypes_object.type, cls._to_python(ctypes_object, *args, **kwargs)
def to_python_not_null(cls, ctypes_object, **kwargs):
return ctypes_object.type, cls._to_python(ctypes_object, **kwargs)

@classmethod
async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
return ctypes_object.type, await cls._to_python_async(ctypes_object, *args, **kwargs)
async def to_python_not_null_async(cls, ctypes_object, **kwargs):
return ctypes_object.type, await cls._to_python_async(ctypes_object, **kwargs)

@classmethod
def from_python_not_null(cls, stream, value, **kwargs):
Expand Down Expand Up @@ -557,7 +544,7 @@ class BinaryObject(Nullable):
COMPACT_FOOTER = 0x0020

@classmethod
def hashcode(cls, value: object, client: Optional['Client']) -> int:
def hashcode(cls, value: object, client: Optional['Client'] = None) -> int:
# binary objects's hashcode implementation is special in the sense
# that you need to fully serialize the object to calculate
# its hashcode
Expand All @@ -568,7 +555,7 @@ def hashcode(cls, value: object, client: Optional['Client']) -> int:
return value._hashcode

@classmethod
async def hashcode_async(cls, value: object, client: Optional['AioClient']) -> int:
async def hashcode_async(cls, value: object, client: Optional['AioClient'] = None) -> int:
if not value._hashcode and client:
with AioBinaryStream(client) as stream:
await value._from_python_async(stream, save_to_buf=True)
Expand Down Expand Up @@ -680,7 +667,7 @@ def __build_final_class(cls, stream, header, header_class, object_fields, fields
return final_class

@classmethod
def to_python_not_null(cls, ctypes_object, client: 'Client' = None, *args, **kwargs):
def to_python_not_null(cls, ctypes_object, client: 'Client' = None, **kwargs):
type_id = ctypes_object.type_id
if not client:
raise ParseError(f'Can not query binary type {type_id}')
Expand All @@ -692,14 +679,13 @@ def to_python_not_null(cls, ctypes_object, client: 'Client' = None, *args, **kwa
for field_name, field_type in data_class.schema.items():
setattr(
result, field_name, field_type.to_python(
getattr(ctypes_object.object_fields, field_name),
client, *args, **kwargs
getattr(ctypes_object.object_fields, field_name), client=client, **kwargs
)
)
return result

@classmethod
async def to_python_not_null_async(cls, ctypes_object, client: 'AioClient' = None, *args, **kwargs):
async def to_python_not_null_async(cls, ctypes_object, client: 'AioClient' = None, **kwargs):
type_id = ctypes_object.type_id
if not client:
raise ParseError(f'Can not query binary type {type_id}')
Expand All @@ -711,7 +697,7 @@ async def to_python_not_null_async(cls, ctypes_object, client: 'AioClient' = Non
field_values = await asyncio.gather(
*[
field_type.to_python_async(
getattr(ctypes_object.object_fields, field_name), client, *args, **kwargs
getattr(ctypes_object.object_fields, field_name), client=client, **kwargs
)
for field_name, field_type in data_class.schema.items()
]
Expand Down
4 changes: 2 additions & 2 deletions pyignite/datatypes/expiry_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ async def parse_async(cls, stream):
return cls.parse(stream)

@classmethod
def to_python(cls, ctypes_object):
def to_python(cls, ctypes_object, **kwargs):
if ctypes_object == 0:
return None

return ExpiryPolicy(create=ctypes_object.create, update=ctypes_object.update, access=ctypes_object.access)

@classmethod
async def to_python_async(cls, ctypes_object):
async def to_python_async(cls, ctypes_object, **kwargs):
return cls.to_python(ctypes_object)

@classmethod
Expand Down
Loading