diff --git a/pyignite/binary.py b/pyignite/binary.py index 5a5f895..551f1d0 100644 --- a/pyignite/binary.py +++ b/pyignite/binary.py @@ -151,7 +151,7 @@ async def _from_python_async(self, stream, save_to_buf=False): write_footer(self, stream, header, header_class, schema_items, offsets, initial_pos, save_to_buf) def write_header(obj, stream): - header_class = BinaryObject.build_header() + header_class = BinaryObject.get_header_class() header = header_class() header.type_code = int.from_bytes( BinaryObject.type_code, diff --git a/pyignite/datatypes/base.py b/pyignite/datatypes/base.py index fbd798b..87b251c 100644 --- a/pyignite/datatypes/base.py +++ b/pyignite/datatypes/base.py @@ -72,9 +72,9 @@ async def from_python_async(cls, stream, value, **kwargs): cls.from_python(stream, value, **kwargs) @classmethod - def to_python(cls, ctype_object, *args, **kwargs): + def to_python(cls, ctypes_object, *args, **kwargs): raise NotImplementedError @classmethod - async def to_python_async(cls, ctype_object, *args, **kwargs): - return cls.to_python(ctype_object, *args, **kwargs) + async def to_python_async(cls, ctypes_object, *args, **kwargs): + return cls.to_python(ctypes_object, *args, **kwargs) diff --git a/pyignite/datatypes/cache_properties.py b/pyignite/datatypes/cache_properties.py index d924507..9bf34de 100644 --- a/pyignite/datatypes/cache_properties.py +++ b/pyignite/datatypes/cache_properties.py @@ -115,12 +115,12 @@ async def parse_async(cls, stream): return cls.parse(stream) @classmethod - def to_python(cls, ctype_object, *args, **kwargs): - return cls.prop_data_class.to_python(ctype_object.data, *args, **kwargs) + def to_python(cls, ctypes_object, *args, **kwargs): + return cls.prop_data_class.to_python(ctypes_object.data, *args, **kwargs) @classmethod - async def to_python_async(cls, ctype_object, *args, **kwargs): - return cls.to_python(ctype_object, *args, **kwargs) + async def to_python_async(cls, ctypes_object, *args, **kwargs): + return cls.to_python(ctypes_object, *args, **kwargs) @classmethod def from_python(cls, stream, value): @@ -295,6 +295,6 @@ def from_python(cls, stream, value): ) @classmethod - def to_python(cls, ctype_object, *args, **kwargs): - prop_data_class = prop_map(ctype_object.prop_code) - return prop_data_class.to_python(ctype_object.data, *args, **kwargs) + def to_python(cls, ctypes_object, *args, **kwargs): + prop_data_class = prop_map(ctypes_object.prop_code) + return prop_data_class.to_python(ctypes_object.data, *args, **kwargs) diff --git a/pyignite/datatypes/complex.py b/pyignite/datatypes/complex.py index 5cb6160..119c552 100644 --- a/pyignite/datatypes/complex.py +++ b/pyignite/datatypes/complex.py @@ -20,6 +20,7 @@ from pyignite.constants import * from pyignite.exceptions import ParseError +from .base import IgniteDataType from .internal import AnyDataObject, Struct, infer_from_python, infer_from_python_async from .type_codes import * from .type_ids import * @@ -41,122 +42,100 @@ class ObjectArrayObject(Nullable): _type_name = NAME_OBJ_ARR _type_id = TYPE_OBJ_ARR + _fields = [ + ('type_code', ctypes.c_byte), + ('type_id', ctypes.c_int), + ('length', ctypes.c_int) + ] type_code = TC_OBJECT_ARRAY - @classmethod - def build_header(cls): - return type( - cls.__name__ + 'Header', - (ctypes.LittleEndianStructure,), - { - '_pack_': 1, - '_fields_': [ - ('type_code', ctypes.c_byte), - ('type_id', ctypes.c_int), - ('length', ctypes.c_int), - ], - } - ) - @classmethod def parse_not_null(cls, stream): - header, header_class = cls.__parse_header(stream) + length, fields = cls.__get_length(stream), [] - fields = [] - for i in range(header.length): + for i in range(length): c_type = AnyDataObject.parse(stream) - fields.append(('element_{}'.format(i), c_type)) + fields.append((f'element_{i}', c_type)) - return cls.__build_final_class(header_class, fields) + return cls.__build_final_class(fields) @classmethod async def parse_not_null_async(cls, stream): - header, header_class = cls.__parse_header(stream) - - fields = [] - for i in range(header.length): + length, fields = cls.__get_length(stream), [] + for i in range(length): c_type = await AnyDataObject.parse_async(stream) - fields.append(('element_{}'.format(i), c_type)) + fields.append((f'element_{i}', c_type)) - return cls.__build_final_class(header_class, fields) + return cls.__build_final_class(fields) @classmethod - def __parse_header(cls, stream): - header_class = cls.build_header() - header = stream.read_ctype(header_class) - stream.seek(ctypes.sizeof(header_class), SEEK_CUR) - return header, header_class + def __get_length(cls, stream): + int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte) + length = int.from_bytes( + stream.slice(stream.tell() + b_sz + int_sz, int_sz), + byteorder=PROTOCOL_BYTE_ORDER + ) + stream.seek(2 * int_sz + b_sz, SEEK_CUR) + return length @classmethod - def __build_final_class(cls, header_class, fields): + def __build_final_class(cls, fields): return type( cls.__name__, - (header_class,), + (ctypes.LittleEndianStructure,), { '_pack_': 1, - '_fields_': fields, + '_fields_': cls._fields + fields, } ) @classmethod - def to_python_not_null(cls, ctype_object, *args, **kwargs): + def to_python_not_null(cls, ctypes_object, *args, **kwargs): result = [] - for i in range(ctype_object.length): + for i in range(ctypes_object.length): result.append( AnyDataObject.to_python( - getattr(ctype_object, 'element_{}'.format(i)), + getattr(ctypes_object, f'element_{i}'), *args, **kwargs ) ) - return ctype_object.type_id, result + return ctypes_object.type_id, result @classmethod - async def to_python_not_null_async(cls, ctype_object, *args, **kwargs): + async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs): result = [ await AnyDataObject.to_python_async( - getattr(ctype_object, 'element_{}'.format(i)), *args, **kwargs + getattr(ctypes_object, f'element_{i}'), *args, **kwargs ) - for i in range(ctype_object.length)] - return ctype_object.type_id, result + for i in range(ctypes_object.length)] + return ctypes_object.type_id, result @classmethod def from_python_not_null(cls, stream, value, *args, **kwargs): - type_or_id, value = value - try: - length = len(value) - except TypeError: - value = [value] - length = 1 - - cls.__write_header(stream, type_or_id, length) + value = cls.__write_header(stream, value) for x in value: infer_from_python(stream, x) @classmethod async def from_python_not_null_async(cls, stream, value, *args, **kwargs): - type_or_id, value = value + value = cls.__write_header(stream, value) + for x in value: + await infer_from_python_async(stream, x) + + @classmethod + def __write_header(cls, stream, value): + type_id, value = value try: length = len(value) except TypeError: value = [value] length = 1 - cls.__write_header(stream, type_or_id, length) - for x in value: - await infer_from_python_async(stream, x) + stream.write(cls.type_code) + stream.write(type_id.to_bytes(ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER, signed=True)) + stream.write(length.to_bytes(ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER)) - @classmethod - def __write_header(cls, stream, type_or_id, length): - header_class = cls.build_header() - header = header_class() - header.type_code = int.from_bytes( - cls.type_code, - byteorder=PROTOCOL_BYTE_ORDER - ) - header.length = length - header.type_id = type_or_id - - stream.write(header) + return value class WrappedDataObject(Nullable): @@ -170,32 +149,23 @@ class WrappedDataObject(Nullable): """ type_code = TC_ARRAY_WRAPPED_OBJECTS - @classmethod - def build_header(cls): - return type( - cls.__name__ + 'Header', - (ctypes.LittleEndianStructure,), - { - '_pack_': 1, - '_fields_': [ - ('type_code', ctypes.c_byte), - ('length', ctypes.c_int), - ], - } - ) - @classmethod def parse_not_null(cls, stream): - header_class = cls.build_header() - header = stream.read_ctype(header_class) + int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte) + length = int.from_bytes( + stream.slice(stream.tell() + b_sz, int_sz), + byteorder=PROTOCOL_BYTE_ORDER + ) final_class = type( cls.__name__, - (header_class,), + (ctypes.LittleEndianStructure,), { '_pack_': 1, '_fields_': [ - ('payload', ctypes.c_byte * header.length), + ('type_code', ctypes.c_byte), + ('length', ctypes.c_int), + ('payload', ctypes.c_byte * length), ('offset', ctypes.c_int), ], } @@ -205,11 +175,11 @@ def parse_not_null(cls, stream): return final_class @classmethod - def to_python_not_null(cls, ctype_object, *args, **kwargs): - return bytes(ctype_object.payload), ctype_object.offset + def to_python_not_null(cls, ctypes_object, *args, **kwargs): + return bytes(ctypes_object.payload), ctypes_object.offset @classmethod - def from_python(cls, stream, value, *args, **kwargs): + def from_python_not_null(cls, stream, value, *args, **kwargs): raise ParseError('Send unwrapped data.') @@ -251,59 +221,47 @@ class CollectionObject(Nullable): _type_name = NAME_COL _type_id = TYPE_COL + _header_class = None type_code = TC_COLLECTION pythonic = list default = [] - @classmethod - def build_header(cls): - return type( - cls.__name__ + 'Header', - (ctypes.LittleEndianStructure,), - { - '_pack_': 1, - '_fields_': [ - ('type_code', ctypes.c_byte), - ('length', ctypes.c_int), - ('type', ctypes.c_byte), - ], - } - ) - @classmethod def parse_not_null(cls, stream): - header, header_class = cls.__parse_header(stream) + fields, length = cls.__parse_header(stream) - fields = [] - for i in range(header.length): + for i in range(length): c_type = AnyDataObject.parse(stream) - fields.append(('element_{}'.format(i), c_type)) + fields.append((f'element_{i}', c_type)) - return cls.__build_final_class(header_class, fields) + return cls.__build_final_class(fields) @classmethod async def parse_not_null_async(cls, stream): - header, header_class = cls.__parse_header(stream) + fields, length = cls.__parse_header(stream) - fields = [] - for i in range(header.length): + for i in range(length): c_type = await AnyDataObject.parse_async(stream) - fields.append(('element_{}'.format(i), c_type)) + fields.append((f'element_{i}', c_type)) - return cls.__build_final_class(header_class, fields) + return cls.__build_final_class(fields) @classmethod def __parse_header(cls, stream): - header_class = cls.build_header() - header = stream.read_ctype(header_class) - stream.seek(ctypes.sizeof(header_class), SEEK_CUR) - return header, header_class + int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte) + header_fields = [('type_code', ctypes.c_byte), ('length', ctypes.c_int), ('type', ctypes.c_byte)] + length = int.from_bytes( + stream.slice(stream.tell() + b_sz, int_sz), + byteorder=PROTOCOL_BYTE_ORDER + ) + stream.seek(int_sz + 2 * b_sz, SEEK_CUR) + return header_fields, length @classmethod - def __build_final_class(cls, header_class, fields): + def __build_final_class(cls, fields): return type( cls.__name__, - (header_class,), + (ctypes.LittleEndianStructure,), { '_pack_': 1, '_fields_': fields, @@ -311,134 +269,91 @@ def __build_final_class(cls, header_class, fields): ) @classmethod - def to_python(cls, ctype_object, *args, **kwargs): - length = cls.__get_length(ctype_object) - if length is None: - return None - + def to_python_not_null(cls, ctypes_object, *args, **kwargs): result = [ - AnyDataObject.to_python(getattr(ctype_object, f'element_{i}'), *args, **kwargs) - for i in range(length) + AnyDataObject.to_python(getattr(ctypes_object, f'element_{i}'), *args, **kwargs) + for i in range(ctypes_object.length) ] - return ctype_object.type, result + return ctypes_object.type, result @classmethod - async def to_python_async(cls, ctype_object, *args, **kwargs): - length = cls.__get_length(ctype_object) - if length is None: - return None - + async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs): result_coro = [ - AnyDataObject.to_python_async(getattr(ctype_object, f'element_{i}'), *args, **kwargs) - for i in range(length) + AnyDataObject.to_python_async(getattr(ctypes_object, f'element_{i}'), *args, **kwargs) + for i in range(ctypes_object.length) ] - return ctype_object.type, await asyncio.gather(*result_coro) - - @classmethod - def __get_length(cls, ctype_object): - return getattr(ctype_object, "length", None) + return ctypes_object.type, await asyncio.gather(*result_coro) @classmethod def from_python_not_null(cls, stream, value, *args, **kwargs): - type_or_id, value = value + type_id, value = value try: length = len(value) except TypeError: value = [value] length = 1 - cls.__write_header(stream, type_or_id, length) + cls.__write_header(stream, type_id, length) for x in value: infer_from_python(stream, x) @classmethod async def from_python_not_null_async(cls, stream, value, *args, **kwargs): - type_or_id, value = value + type_id, value = value try: length = len(value) except TypeError: value = [value] length = 1 - cls.__write_header(stream, type_or_id, length) + cls.__write_header(stream, type_id, length) for x in value: await infer_from_python_async(stream, x) @classmethod - def __write_header(cls, stream, type_or_id, length): - header_class = cls.build_header() - header = header_class() - header.type_code = int.from_bytes( - cls.type_code, - byteorder=PROTOCOL_BYTE_ORDER + def __write_header(cls, stream, type_id, length): + stream.write(cls.type_code) + stream.write(length.to_bytes( + ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER + )) + stream.write(type_id.to_bytes( + length=ctypes.sizeof(ctypes.c_byte), + byteorder=PROTOCOL_BYTE_ORDER, + signed=True) ) - header.length = length - header.type = type_or_id - - stream.write(header) - -class Map(Nullable): - """ - Dictionary type, payload-only. - - Ignite does not track the order of key-value pairs in its caches, hence - the ordinary Python dict type, not the collections.OrderedDict. - """ - _type_name = NAME_MAP - _type_id = TYPE_MAP +class _MapBase: HASH_MAP = 1 LINKED_HASH_MAP = 2 @classmethod - def build_header(cls): - return type( - cls.__name__ + 'Header', - (ctypes.LittleEndianStructure,), - { - '_pack_': 1, - '_fields_': [ - ('length', ctypes.c_int), - ], - } - ) + def _parse_header(cls, stream): + raise NotImplementedError @classmethod - def parse_not_null(cls, stream): - header, header_class = cls.__parse_header(stream) - - fields = [] - for i in range(header.length << 1): + def _parse(cls, stream): + fields, length = cls._parse_header(stream) + for i in range(length << 1): c_type = AnyDataObject.parse(stream) - fields.append(('element_{}'.format(i), c_type)) - - return cls.__build_final_class(header_class, fields) + fields.append((f'element_{i}', c_type)) + return cls.__build_final_class(fields) @classmethod - async def parse_not_null_async(cls, stream): - header, header_class = cls.__parse_header(stream) - - fields = [] - for i in range(header.length << 1): + async def _parse_async(cls, stream): + fields, length = cls._parse_header(stream) + for i in range(length << 1): c_type = await AnyDataObject.parse_async(stream) - fields.append(('element_{}'.format(i), c_type)) - - return cls.__build_final_class(header_class, fields) + fields.append((f'element_{i}', c_type)) - @classmethod - def __parse_header(cls, stream): - header_class = cls.build_header() - header = stream.read_ctype(header_class) - stream.seek(ctypes.sizeof(header_class), SEEK_CUR) - return header, header_class + return cls.__build_final_class(fields) @classmethod - def __build_final_class(cls, header_class, fields): + def __build_final_class(cls, fields): return type( cls.__name__, - (header_class,), + (ctypes.LittleEndianStructure,), { '_pack_': 1, '_fields_': fields, @@ -446,76 +361,118 @@ def __build_final_class(cls, header_class, fields): ) @classmethod - def to_python(cls, ctype_object, *args, **kwargs): - map_cls = cls.__get_map_class(ctype_object) + def _to_python(cls, ctypes_object, *args, **kwargs): + map_cls = cls.__get_map_class(ctypes_object) result = map_cls() - for i in range(0, ctype_object.length << 1, 2): + for i in range(0, ctypes_object.length << 1, 2): k = AnyDataObject.to_python( - getattr(ctype_object, 'element_{}'.format(i)), + getattr(ctypes_object, f'element_{i}'), *args, **kwargs ) v = AnyDataObject.to_python( - getattr(ctype_object, 'element_{}'.format(i + 1)), + getattr(ctypes_object, f'element_{i + 1}'), *args, **kwargs ) result[k] = v return result @classmethod - async def to_python_async(cls, ctype_object, *args, **kwargs): - map_cls = cls.__get_map_class(ctype_object) + async def _to_python_async(cls, ctypes_object, *args, **kwargs): + map_cls = cls.__get_map_class(ctypes_object) kv_pairs_coro = [ asyncio.gather( AnyDataObject.to_python_async( - getattr(ctype_object, 'element_{}'.format(i)), + getattr(ctypes_object, f'element_{i}'), *args, **kwargs ), AnyDataObject.to_python_async( - getattr(ctype_object, 'element_{}'.format(i + 1)), + getattr(ctypes_object, f'element_{i + 1}'), *args, **kwargs ) - ) for i in range(0, ctype_object.length << 1, 2) + ) for i in range(0, ctypes_object.length << 1, 2) ] return map_cls(await asyncio.gather(*kv_pairs_coro)) @classmethod - def __get_map_class(cls, ctype_object): - map_type = getattr(ctype_object, 'type', cls.HASH_MAP) + def __get_map_class(cls, ctypes_object): + map_type = getattr(ctypes_object, 'type', cls.HASH_MAP) return OrderedDict if map_type == cls.LINKED_HASH_MAP else dict @classmethod - def from_python(cls, stream, value, type_id=None): - cls.__write_header(stream, type_id, len(value)) + def _from_python(cls, stream, value, type_id=None): + cls._write_header(stream, type_id, len(value)) for k, v in value.items(): infer_from_python(stream, k) infer_from_python(stream, v) @classmethod - async def from_python_async(cls, stream, value, type_id=None): - cls.__write_header(stream, type_id, len(value)) + async def _from_python_async(cls, stream, value, type_id): + cls._write_header(stream, type_id, len(value)) for k, v in value.items(): await infer_from_python_async(stream, k) await infer_from_python_async(stream, v) @classmethod - def __write_header(cls, stream, type_id, length): - header_class = cls.build_header() - header = header_class() - header.length = length + def _write_header(cls, stream, type_id, length): + raise NotImplementedError - if hasattr(header, 'type_code'): - header.type_code = int.from_bytes(cls.type_code, byteorder=PROTOCOL_BYTE_ORDER) - if hasattr(header, 'type'): - header.type = type_id +class Map(IgniteDataType, _MapBase): + """ + Dictionary type, payload-only. - stream.write(header) + Ignite does not track the order of key-value pairs in its caches, hence + the ordinary Python dict type, not the collections.OrderedDict. + """ + _type_name = NAME_MAP + _type_id = TYPE_MAP + + @classmethod + def parse(cls, stream): + return cls._parse(stream) + + @classmethod + async def parse_async(cls, stream): + return await cls._parse_async(stream) + + @classmethod + def _parse_header(cls, stream): + int_sz = ctypes.sizeof(ctypes.c_int) + length = int.from_bytes( + stream.slice(stream.tell(), int_sz), + byteorder=PROTOCOL_BYTE_ORDER + ) + stream.seek(int_sz, SEEK_CUR) + return [('length', ctypes.c_int)], length + + @classmethod + def to_python(cls, ctypes_object, *args, **kwargs): + return cls._to_python(ctypes_object, *args, **kwargs) + + @classmethod + async def to_python_async(cls, ctypes_object, *args, **kwargs): + return await cls._to_python_async(ctypes_object, *args, **kwargs) + + @classmethod + def from_python(cls, stream, value, type_id=None): + return cls._from_python(stream, value, type_id) + + @classmethod + async def from_python_async(cls, stream, value, type_id=None): + return await cls._from_python_async(stream, value, type_id) + + @classmethod + def _write_header(cls, stream, type_id, length): + stream.write(length.to_bytes( + length=ctypes.sizeof(ctypes.c_int), + byteorder=PROTOCOL_BYTE_ORDER + )) -class MapObject(Map): +class MapObject(Nullable, _MapBase): """ This is a dictionary type. @@ -531,61 +488,65 @@ class MapObject(Map): default = {} @classmethod - def build_header(cls): - return type( - cls.__name__ + 'Header', - (ctypes.LittleEndianStructure,), - { - '_pack_': 1, - '_fields_': [ - ('type_code', ctypes.c_byte), - ('length', ctypes.c_int), - ('type', ctypes.c_byte), - ], - } - ) + def parse_not_null(cls, stream): + return cls._parse(stream) @classmethod - def to_python(cls, ctype_object, *args, **kwargs): - obj_type = getattr(ctype_object, "type", None) - if obj_type: - return obj_type, super().to_python(ctype_object, *args, **kwargs) - return None + async def parse_not_null_async(cls, stream): + return await cls._parse_async(stream) @classmethod - async def to_python_async(cls, ctype_object, *args, **kwargs): - obj_type = getattr(ctype_object, "type", None) - if obj_type: - return obj_type, await super().to_python_async(ctype_object, *args, **kwargs) - return None + def _parse_header(cls, stream): + int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte) + length = int.from_bytes( + stream.slice(stream.tell() + b_sz, int_sz), + byteorder=PROTOCOL_BYTE_ORDER + ) + stream.seek(int_sz + 2 * b_sz, SEEK_CUR) + fields = [('type_code', ctypes.c_byte), ('length', ctypes.c_int), ('type', ctypes.c_byte)] + return fields, length @classmethod - def __get_obj_type(cls, ctype_object): - return getattr(ctype_object, "type", None) + def to_python_not_null(cls, ctypes_object, *args, **kwargs): + return ctypes_object.type, cls._to_python(ctypes_object, *args, **kwargs) @classmethod - def from_python(cls, stream, value, **kwargs): - type_id, value = cls.__unpack_value(stream, value) - if value: - super().from_python(stream, value, type_id) + async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs): + return ctypes_object.type, await cls._to_python_async(ctypes_object, *args, **kwargs) @classmethod - async def from_python_async(cls, stream, value, **kwargs): - type_id, value = cls.__unpack_value(stream, value) - if value: - await super().from_python_async(stream, value, type_id) + def from_python_not_null(cls, stream, value, **kwargs): + type_id, value = value + if value is None: + Null.from_python(stream) + else: + cls._from_python(stream, value, type_id) @classmethod - def __unpack_value(cls, stream, value): + async def from_python_not_null_async(cls, stream, value, **kwargs): + type_id, value = value if value is None: Null.from_python(stream) - return None, None + else: + await cls._from_python_async(stream, value, type_id) - return value + @classmethod + def _write_header(cls, stream, type_id, length): + stream.write(cls.type_code) + stream.write(length.to_bytes( + length=ctypes.sizeof(ctypes.c_int), + byteorder=PROTOCOL_BYTE_ORDER) + ) + stream.write(type_id.to_bytes( + length=ctypes.sizeof(ctypes.c_byte), + byteorder=PROTOCOL_BYTE_ORDER, + signed=True) + ) class BinaryObject(Nullable): _type_id = TYPE_BINARY_OBJ + _header_class = None type_code = TC_COMPLEX_OBJECT USER_TYPE = 0x0001 @@ -615,24 +576,26 @@ async def hashcode_async(cls, value: object, client: Optional['AioClient']) -> i return value._hashcode @classmethod - def build_header(cls): - return type( - cls.__name__, - (ctypes.LittleEndianStructure,), - { - '_pack_': 1, - '_fields_': [ - ('type_code', ctypes.c_byte), - ('version', ctypes.c_byte), - ('flags', ctypes.c_short), - ('type_id', ctypes.c_int), - ('hash_code', ctypes.c_int), - ('length', ctypes.c_int), - ('schema_id', ctypes.c_int), - ('schema_offset', ctypes.c_int), - ], - } - ) + def get_header_class(cls): + if not cls._header_class: + cls._header_class = type( + cls.__name__, + (ctypes.LittleEndianStructure,), + { + '_pack_': 1, + '_fields_': [ + ('type_code', ctypes.c_byte), + ('version', ctypes.c_byte), + ('flags', ctypes.c_short), + ('type_id', ctypes.c_int), + ('hash_code', ctypes.c_int), + ('length', ctypes.c_int), + ('schema_id', ctypes.c_int), + ('schema_offset', ctypes.c_int), + ], + } + ) + return cls._header_class @classmethod def offset_c_type(cls, flags: int): @@ -686,7 +649,7 @@ async def parse_not_null_async(cls, stream): @classmethod def __parse_header(cls, stream): - header_class = cls.build_header() + header_class = cls.get_header_class() header = stream.read_ctype(header_class) stream.seek(ctypes.sizeof(header_class), SEEK_CUR) return header, header_class @@ -717,51 +680,51 @@ def __build_final_class(cls, stream, header, header_class, object_fields, fields return final_class @classmethod - def to_python(cls, ctype_object, client: 'Client' = None, *args, **kwargs): - type_id = cls.__get_type_id(ctype_object, client) - if type_id: - data_class = client.query_binary_type(type_id, ctype_object.schema_id) - - result = data_class() - result.version = ctype_object.version - for field_name, field_type in data_class.schema.items(): - setattr( - result, field_name, field_type.to_python( - getattr(ctype_object.object_fields, field_name), - client, *args, **kwargs - ) - ) - return result + def to_python_not_null(cls, ctypes_object, client: 'Client' = None, *args, **kwargs): + type_id = ctypes_object.type_id + if not client: + raise ParseError(f'Can not query binary type {type_id}') - return None + data_class = client.query_binary_type(type_id, ctypes_object.schema_id) + result = data_class() + result.version = ctypes_object.version - @classmethod - async def to_python_async(cls, ctype_object, client: 'AioClient' = None, *args, **kwargs): - type_id = cls.__get_type_id(ctype_object, client) - if type_id: - data_class = await client.query_binary_type(type_id, ctype_object.schema_id) - - result = data_class() - result.version = ctype_object.version - - field_values = await asyncio.gather( - *[ - field_type.to_python_async( - getattr(ctype_object.object_fields, field_name), client, *args, **kwargs - ) - for field_name, field_type in data_class.schema.items() - ] + for field_name, field_type in data_class.schema.items(): + setattr( + result, field_name, field_type.to_python( + getattr(ctypes_object.object_fields, field_name), + client, *args, **kwargs + ) ) + return result - for i, field_name in enumerate(data_class.schema.keys()): - setattr(result, field_name, field_values[i]) + @classmethod + async def to_python_not_null_async(cls, ctypes_object, client: 'AioClient' = None, *args, **kwargs): + type_id = ctypes_object.type_id + if not client: + raise ParseError(f'Can not query binary type {type_id}') - return result - return None + data_class = await client.query_binary_type(type_id, ctypes_object.schema_id) + result = data_class() + result.version = ctypes_object.version + + field_values = await asyncio.gather( + *[ + field_type.to_python_async( + getattr(ctypes_object.object_fields, field_name), client, *args, **kwargs + ) + for field_name, field_type in data_class.schema.items() + ] + ) + + for i, field_name in enumerate(data_class.schema.keys()): + setattr(result, field_name, field_values[i]) + + return result @classmethod - def __get_type_id(cls, ctype_object, client): - type_id = getattr(ctype_object, "type_id", None) + def __get_type_id(cls, ctypes_object, client): + type_id = getattr(ctypes_object, "type_id", None) if type_id: if not client: raise ParseError(f'Can not query binary type {type_id}') diff --git a/pyignite/datatypes/internal.py b/pyignite/datatypes/internal.py index 55ed844..9bd1b76 100644 --- a/pyignite/datatypes/internal.py +++ b/pyignite/datatypes/internal.py @@ -136,15 +136,15 @@ async def parse_async(self, stream, context): return await self.var1.parse_async(stream) return await self.var2.parse_async(stream) - def to_python(self, ctype_object, context, *args, **kwargs): + def to_python(self, ctypes_object, context, *args, **kwargs): if self.predicate2(context): - return self.var1.to_python(ctype_object, *args, **kwargs) - return self.var2.to_python(ctype_object, *args, **kwargs) + return self.var1.to_python(ctypes_object, *args, **kwargs) + return self.var2.to_python(ctypes_object, *args, **kwargs) - async def to_python_async(self, ctype_object, context, *args, **kwargs): + async def to_python_async(self, ctypes_object, context, *args, **kwargs): if self.predicate2(context): - return await self.var1.to_python_async(ctype_object, *args, **kwargs) - return await self.var2.to_python_async(ctype_object, *args, **kwargs) + return await self.var1.to_python_async(ctypes_object, *args, **kwargs) + return await self.var2.to_python_async(ctypes_object, *args, **kwargs) @attr.s @@ -154,67 +154,56 @@ class StructArray: counter_type = attr.ib(default=ctypes.c_int) defaults = attr.ib(type=dict, default={}) - def build_header_class(self): - return type( - self.__class__.__name__ + 'Header', - (ctypes.LittleEndianStructure,), - { - '_pack_': 1, - '_fields_': [ - ('length', self.counter_type), - ], - }, - ) - def parse(self, stream): - fields, length = [], self.__parse_length(stream) + fields, length = self.__parse_header(stream) for i in range(length): c_type = Struct(self.following).parse(stream) - fields.append(('element_{}'.format(i), c_type)) + fields.append((f'element_{i}', c_type)) - return self.__build_final_class(fields) + return self.build_c_type(fields) async def parse_async(self, stream): - fields, length = [], self.__parse_length(stream) + fields, length = self.__parse_header(stream) for i in range(length): c_type = await Struct(self.following).parse_async(stream) - fields.append(('element_{}'.format(i), c_type)) + fields.append((f'element_{i}', c_type)) - return self.__build_final_class(fields) + return self.build_c_type(fields) - def __parse_length(self, stream): - counter_type_len = ctypes.sizeof(self.counter_type) + def __parse_header(self, stream): + counter_sz = ctypes.sizeof(self.counter_type) length = int.from_bytes( - stream.slice(offset=counter_type_len), + stream.slice(offset=counter_sz), byteorder=PROTOCOL_BYTE_ORDER ) - stream.seek(counter_type_len, SEEK_CUR) - return length + stream.seek(counter_sz, SEEK_CUR) + return [('length', self.counter_type)], length - def __build_final_class(self, fields): + @staticmethod + def build_c_type(fields): return type( 'StructArray', - (self.build_header_class(),), + (ctypes.LittleEndianStructure,), { '_pack_': 1, '_fields_': fields, }, ) - def to_python(self, ctype_object, *args, **kwargs): - length = getattr(ctype_object, 'length', 0) + def to_python(self, ctypes_object, *args, **kwargs): + length = getattr(ctypes_object, 'length', 0) return [ - Struct(self.following, dict_type=dict).to_python(getattr(ctype_object, 'element_{}'.format(i)), + Struct(self.following, dict_type=dict).to_python(getattr(ctypes_object, f'element_{i}'), *args, **kwargs) for i in range(length) ] - async def to_python_async(self, ctype_object, *args, **kwargs): - length = getattr(ctype_object, 'length', 0) + async def to_python_async(self, ctypes_object, *args, **kwargs): + length = getattr(ctypes_object, 'length', 0) result_coro = [ - Struct(self.following, dict_type=dict).to_python_async(getattr(ctype_object, 'element_{}'.format(i)), + Struct(self.following, dict_type=dict).to_python_async(getattr(ctypes_object, f'element_{i}'), *args, **kwargs) for i in range(length) ] @@ -239,10 +228,10 @@ async def from_python_async(self, stream, value): await el_class.from_python_async(stream, v[name]) def __write_header(self, stream, length): - header_class = self.build_header_class() - header = header_class() - header.length = length - stream.write(header) + stream.write( + length.to_bytes(ctypes.sizeof(self.counter_type), + byteorder=PROTOCOL_BYTE_ORDER) + ) @attr.s @@ -262,7 +251,7 @@ def parse(self, stream): if name in ctx: ctx[name] = stream.read_ctype(c_type, direction=READ_BACKWARD) - return self.__build_final_class(fields) + return self.build_c_type(fields) async def parse_async(self, stream): fields, ctx = [], self.__prepare_conditional_ctx() @@ -274,7 +263,7 @@ async def parse_async(self, stream): if name in ctx: ctx[name] = stream.read_ctype(c_type, direction=READ_BACKWARD) - return self.__build_final_class(fields) + return self.build_c_type(fields) def __prepare_conditional_ctx(self): ctx = {} @@ -285,7 +274,7 @@ def __prepare_conditional_ctx(self): return ctx @staticmethod - def __build_final_class(fields): + def build_c_type(fields): return type( 'Struct', (ctypes.LittleEndianStructure,), @@ -295,34 +284,34 @@ def __build_final_class(fields): }, ) - def to_python(self, ctype_object, *args, **kwargs) -> Union[dict, OrderedDict]: + def to_python(self, ctypes_object, *args, **kwargs) -> Union[dict, OrderedDict]: result = self.dict_type() for name, c_type in self.fields: is_cond = isinstance(c_type, Conditional) result[name] = c_type.to_python( - getattr(ctype_object, name), + getattr(ctypes_object, name), result, *args, **kwargs ) if is_cond else c_type.to_python( - getattr(ctype_object, name), + getattr(ctypes_object, name), *args, **kwargs ) return result - async def to_python_async(self, ctype_object, *args, **kwargs) -> Union[dict, OrderedDict]: + async def to_python_async(self, ctypes_object, *args, **kwargs) -> Union[dict, OrderedDict]: result = self.dict_type() for name, c_type in self.fields: is_cond = isinstance(c_type, Conditional) if is_cond: value = await c_type.to_python_async( - getattr(ctype_object, name), + getattr(ctypes_object, name), result, *args, **kwargs ) else: value = await c_type.to_python_async( - getattr(ctype_object, name), + getattr(ctypes_object, name), *args, **kwargs ) result[name] = value @@ -405,18 +394,18 @@ def __data_class_parse(cls, stream): raise ParseError('Unknown type code: `{}`'.format(type_code)) @classmethod - def to_python(cls, ctype_object, *args, **kwargs): - data_class = cls.__data_class_from_ctype(ctype_object) - return data_class.to_python(ctype_object) + def to_python(cls, ctypes_object, *args, **kwargs): + data_class = cls.__data_class_from_ctype(ctypes_object) + return data_class.to_python(ctypes_object) @classmethod - async def to_python_async(cls, ctype_object, *args, **kwargs): - data_class = cls.__data_class_from_ctype(ctype_object) - return await data_class.to_python_async(ctype_object) + async def to_python_async(cls, ctypes_object, *args, **kwargs): + data_class = cls.__data_class_from_ctype(ctypes_object) + return await data_class.to_python_async(ctypes_object) @classmethod - def __data_class_from_ctype(cls, ctype_object): - type_code = ctype_object.type_code.to_bytes( + def __data_class_from_ctype(cls, ctypes_object): + type_code = ctypes_object.type_code.to_bytes( ctypes.sizeof(ctypes.c_byte), byteorder=PROTOCOL_BYTE_ORDER ) @@ -440,7 +429,7 @@ def _init_python_mapping(cls): int: LongObject, float: DoubleObject, str: String, - bytes: String, + bytes: ByteArrayObject, bytearray: ByteArrayObject, bool: BoolObject, type(None): Null, @@ -455,7 +444,6 @@ def _init_python_mapping(cls): int: LongArrayObject, float: DoubleArrayObject, str: StringArrayObject, - bytes: StringArrayObject, bool: BoolArrayObject, uuid.UUID: UUIDArrayObject, datetime: DateArrayObject, @@ -558,48 +546,33 @@ class AnyDataArray(AnyDataObject): """ counter_type = attr.ib(default=ctypes.c_int) - def build_header(self): - return type( - self.__class__.__name__ + 'Header', - (ctypes.LittleEndianStructure,), - { - '_pack_': 1, - '_fields_': [ - ('length', self.counter_type), - ], - } - ) - def parse(self, stream): - header, header_class = self.__parse_header(stream) - - fields = [] - for i in range(header.length): + fields, length = self.__parse_header(stream) + for i in range(length): c_type = super().parse(stream) - fields.append(('element_{}'.format(i), c_type)) - - return self.__build_final_class(header_class, fields) + fields.append((f'element_{i}', c_type)) + return self.build_c_type(fields) async def parse_async(self, stream): - header, header_class = self.__parse_header(stream) - - fields = [] - for i in range(header.length): + fields, length = self.__parse_header(stream) + for i in range(length): c_type = await super().parse_async(stream) - fields.append(('element_{}'.format(i), c_type)) - - return self.__build_final_class(header_class, fields) + fields.append((f'element_{i}', c_type)) + return self.build_c_type(fields) def __parse_header(self, stream): - header_class = self.build_header() - header = stream.read_ctype(header_class) - stream.seek(ctypes.sizeof(header_class), SEEK_CUR) - return header, header_class + cnt_sz = ctypes.sizeof(self.counter_type) + length = int.from_bytes( + stream.slice(stream.tell(), cnt_sz), + byteorder=PROTOCOL_BYTE_ORDER + ) + stream.seek(cnt_sz, SEEK_CUR) + return [('length', self.counter_type)], length - def __build_final_class(self, header_class, fields): + def build_c_type(self, fields): return type( self.__class__.__name__, - (header_class,), + (ctypes.LittleEndianStructure,), { '_pack_': 1, '_fields_': fields, @@ -607,56 +580,50 @@ def __build_final_class(self, header_class, fields): ) @classmethod - def to_python(cls, ctype_object, *args, **kwargs): - length = cls.__get_length(ctype_object) + def to_python(cls, ctypes_object, *args, **kwargs): + length = getattr(ctypes_object, "length", 0) return [ - super().to_python(getattr(ctype_object, 'element_{}'.format(i)), *args, **kwargs) + super().to_python(getattr(ctypes_object, f'element_{i}'), *args, **kwargs) for i in range(length) ] @classmethod - async def to_python_async(cls, ctype_object, *args, **kwargs): - length = cls.__get_length(ctype_object) + async def to_python_async(cls, ctypes_object, *args, **kwargs): + length = getattr(ctypes_object, "length", 0) values = asyncio.gather( *[ super().to_python( - getattr(ctype_object, 'element_{}'.format(i)), + getattr(ctypes_object, f'element_{i}'), *args, **kwargs ) for i in range(length) ] ) return await values - @staticmethod - def __get_length(ctype_object): - return getattr(ctype_object, "length", None) - def from_python(self, stream, value): - try: - length = len(value) - except TypeError: - value = [value] - length = 1 - self.__write_header(stream, length) + value = self.__write_header_and_process_value(stream, value) for x in value: infer_from_python(stream, x) async def from_python_async(self, stream, value): + value = self.__write_header_and_process_value(stream, value) + + for x in value: + await infer_from_python_async(stream, x) + + def __write_header_and_process_value(self, stream, value): try: length = len(value) except TypeError: value = [value] length = 1 - self.__write_header(stream, length) - for x in value: - await infer_from_python_async(stream, x) + stream.write(length.to_bytes( + ctypes.sizeof(self.counter_type), + byteorder=PROTOCOL_BYTE_ORDER + )) - def __write_header(self, stream, length): - header_class = self.build_header() - header = header_class() - header.length = length - stream.write(header) + return value diff --git a/pyignite/datatypes/primitive.py b/pyignite/datatypes/primitive.py index 3bbb196..037f680 100644 --- a/pyignite/datatypes/primitive.py +++ b/pyignite/datatypes/primitive.py @@ -52,8 +52,8 @@ def parse(cls, stream): return cls.c_type @classmethod - def to_python(cls, ctype_object, *args, **kwargs): - return ctype_object + def to_python(cls, ctypes_object, *args, **kwargs): + return ctypes_object class Byte(Primitive): @@ -122,8 +122,8 @@ class Char(Primitive): c_type = ctypes.c_short @classmethod - def to_python(cls, ctype_object, *args, **kwargs): - return ctype_object.value.to_bytes( + def to_python(cls, ctypes_object, *args, **kwargs): + return ctypes_object.value.to_bytes( ctypes.sizeof(cls.c_type), byteorder=PROTOCOL_BYTE_ORDER ).decode(PROTOCOL_CHAR_ENCODING) @@ -147,9 +147,9 @@ class Bool(Primitive): c_type = ctypes.c_byte # Use c_byte because c_bool throws endianness conversion error on BE systems. @classmethod - def to_python(cls, ctype_object, *args, **kwargs): - return ctype_object != 0 + def to_python(cls, ctypes_object, *args, **kwargs): + return ctypes_object != 0 @classmethod - def from_python(cls, stream, value): + def from_python(cls, stream, value, **kwargs): stream.write(struct.pack(" int: return ord(value) @classmethod - def to_python(cls, ctype_object, *args, **kwargs): - value = getattr(ctype_object, "value", None) - if value is None: - return None + def to_python_not_null(cls, ctypes_object, *args, **kwargs): + value = ctypes_object.value return value.to_bytes( ctypes.sizeof(cls.c_type), byteorder=PROTOCOL_BYTE_ORDER @@ -224,8 +218,5 @@ def hashcode(cls, value: bool, *args, **kwargs) -> int: return 1231 if value else 1237 @classmethod - def to_python(cls, ctype_object, *args, **kwargs): - value = getattr(ctype_object, "value", None) - if value is None: - return None - return value != 0 + def to_python_not_null(cls, ctypes_object, *args, **kwargs): + return ctypes_object.value != 0 diff --git a/pyignite/datatypes/standard.py b/pyignite/datatypes/standard.py index 4ca6795..5657afb 100644 --- a/pyignite/datatypes/standard.py +++ b/pyignite/datatypes/standard.py @@ -23,6 +23,7 @@ from pyignite.constants import * from pyignite.utils import datetime_hashcode, decimal_hashcode, hashcode +from .base import IgniteDataType from .type_codes import * from .type_ids import * from .type_names import * @@ -100,14 +101,14 @@ def parse_not_null(cls, stream): return data_type @classmethod - def to_python_not_null(cls, ctype_object, *args, **kwargs): - if ctype_object.length > 0: - return ctype_object.data.decode(PROTOCOL_STRING_ENCODING) + def to_python_not_null(cls, ctypes_object, *args, **kwargs): + if ctypes_object.length > 0: + return ctypes_object.data.decode(PROTOCOL_STRING_ENCODING) return '' @classmethod - def from_python_not_null(cls, stream, value): + def from_python_not_null(cls, stream, value, **kwargs): if isinstance(value, str): value = value.encode(PROTOCOL_STRING_ENCODING) length = len(value) @@ -135,7 +136,7 @@ def hashcode(cls, value: decimal.Decimal, *args, **kwargs) -> int: return decimal_hashcode(value) @classmethod - def build_c_header(cls): + def build_c_type(cls, length): return type( cls.__name__, (ctypes.LittleEndianStructure,), @@ -145,48 +146,41 @@ def build_c_header(cls): ('type_code', ctypes.c_byte), ('scale', ctypes.c_int), ('length', ctypes.c_int), - ], + ('data', ctypes.c_ubyte * length) + ] } ) @classmethod def parse_not_null(cls, stream): - header_class = cls.build_c_header() - header = stream.read_ctype(header_class) - - data_type = type( - cls.__name__, - (header_class,), - { - '_pack_': 1, - '_fields_': [ - ('data', ctypes.c_ubyte * header.length), - ], - } + int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte) + length = int.from_bytes( + stream.slice(stream.tell() + int_sz + b_sz, int_sz), + byteorder=PROTOCOL_BYTE_ORDER ) - + data_type = cls.build_c_type(length) stream.seek(ctypes.sizeof(data_type), SEEK_CUR) return data_type @classmethod - def to_python_not_null(cls, ctype_object, *args, **kwargs): - sign = 1 if ctype_object.data[0] & 0x80 else 0 - data = ctype_object.data[1:] - data.insert(0, ctype_object.data[0] & 0x7f) + def to_python_not_null(cls, ctypes_object, *args, **kwargs): + sign = 1 if ctypes_object.data[0] & 0x80 else 0 + data = ctypes_object.data[1:] + data.insert(0, ctypes_object.data[0] & 0x7f) # decode n-byte integer result = sum([ [x for x in reversed(data)][i] * 0x100 ** i for i in range(len(data)) ]) # apply scale - result = result / decimal.Decimal('10') ** decimal.Decimal(ctype_object.scale) + result = result / decimal.Decimal('10') ** decimal.Decimal(ctypes_object.scale) if sign: # apply sign result = -result return result @classmethod - def from_python_not_null(cls, stream, value: decimal.Decimal): + def from_python_not_null(cls, stream, value: decimal.Decimal, **kwargs): sign, digits, scale = value.normalize().as_tuple() integer = int(''.join([str(d) for d in digits])) # calculate number of bytes (at least one, and not forget the sign bit) @@ -202,17 +196,7 @@ def from_python_not_null(cls, stream, value: decimal.Decimal): data[0] |= 0x80 else: data[0] &= 0x7f - header_class = cls.build_c_header() - data_class = type( - cls.__name__, - (header_class,), - { - '_pack_': 1, - '_fields_': [ - ('data', ctypes.c_ubyte * length), - ], - } - ) + data_class = cls.build_c_type(length) data_object = data_class() data_object.type_code = int.from_bytes( cls.type_code, @@ -266,7 +250,7 @@ def build_c_type(cls): return cls._object_c_type @classmethod - def from_python_not_null(cls, stream, value: uuid.UUID): + def from_python_not_null(cls, stream, value: uuid.UUID, **kwargs): data_type = cls.build_c_type() data_object = data_type() data_object.type_code = int.from_bytes( @@ -381,7 +365,7 @@ def build_c_type(cls): return cls._object_c_type @classmethod - def from_python_not_null(cls, stream, value: [date, datetime]): + def from_python_not_null(cls, stream, value: [date, datetime], **kwargs): if type(value) is date: value = datetime.combine(value, time()) data_type = cls.build_c_type() @@ -433,7 +417,7 @@ def build_c_type(cls): return cls._object_c_type @classmethod - def from_python_not_null(cls, stream, value: timedelta): + def from_python_not_null(cls, stream, value: timedelta, **kwargs): data_type = cls.build_c_type() data_object = data_type() data_object.type_code = int.from_bytes( @@ -480,7 +464,7 @@ def build_c_type(cls): return cls._object_c_type @classmethod - def from_python_not_null(cls, stream, value: tuple): + def from_python_not_null(cls, stream, value: tuple, **kwargs): data_type = cls.build_c_type() data_object = data_type() data_object.type_code = int.from_bytes( @@ -505,84 +489,89 @@ class BinaryEnumObject(EnumObject): type_code = TC_BINARY_ENUM -class StandardArray(Nullable): - """ - Base class for array of primitives. Payload-only. - """ - _type_name = None - _type_id = None +class _StandardArrayBase: standard_type = None - type_code = None @classmethod - def build_header_class(cls): - return type( - cls.__name__ + 'Header', - (ctypes.LittleEndianStructure,), - { - '_pack_': 1, - '_fields_': [ - ('length', ctypes.c_int), - ], - } - ) + def _parse_header(cls, stream): + raise NotImplementedError @classmethod - def parse_not_null(cls, stream): - header_class = cls.build_header_class() - header = stream.read_ctype(header_class) - stream.seek(ctypes.sizeof(header_class), SEEK_CUR) + def _parse(cls, stream): + fields, length = cls._parse_header(stream) - fields = [] - for i in range(header.length): + for i in range(length): c_type = cls.standard_type.parse(stream) - fields.append(('element_{}'.format(i), c_type)) + fields.append((f'element_{i}', c_type)) - final_class = type( + return type( cls.__name__, - (header_class,), + (ctypes.LittleEndianStructure,), { '_pack_': 1, '_fields_': fields, } ) - return final_class @classmethod - def to_python(cls, ctype_object, *args, **kwargs): - length = getattr(ctype_object, "length", None) - if length is None: - return None + def _write_header(cls, stream, value, **kwargs): + raise NotImplementedError - result = [] - for i in range(length): - result.append( - cls.standard_type.to_python( - getattr(ctype_object, 'element_{}'.format(i)), - *args, **kwargs - ) - ) - return result + @classmethod + def _from_python(cls, stream, value, **kwargs): + cls._write_header(stream, value, **kwargs) + for x in value: + cls.standard_type.from_python(stream, x) @classmethod - async def to_python_async(cls, ctypes_object, *args, **kwargs): - return cls.to_python(ctypes_object, *args, **kwargs) + def _to_python(cls, ctypes_object, *args, **kwargs): + length = ctypes_object.length + return [ + cls.standard_type.to_python( + getattr(ctypes_object, f'element_{i}'), *args, **kwargs + ) for i in range(length) + ] + + +class StandardArray(IgniteDataType, _StandardArrayBase): + """ + Base class for array of primitives. Payload-only. + """ + _type_name = None + _type_id = None + type_code = None @classmethod - def from_python_not_null(cls, stream, value, **kwargs): - header_class = cls.build_header_class() - header = header_class() - if hasattr(header, 'type_code'): - header.type_code = int.from_bytes( - cls.type_code, + def _parse_header(cls, stream): + int_sz = ctypes.sizeof(ctypes.c_int) + length = int.from_bytes( + stream.slice(stream.tell(), int_sz), + byteorder=PROTOCOL_BYTE_ORDER + ) + stream.seek(int_sz, SEEK_CUR) + + return [('length', ctypes.c_int)], length + + @classmethod + def parse(cls, stream): + return cls._parse(stream) + + @classmethod + def _write_header(cls, stream, value, **kwargs): + stream.write( + len(value).to_bytes( + length=ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER ) - length = len(value) - header.length = length + ) - stream.write(header) - for x in value: - cls.standard_type.from_python(stream, x) + @classmethod + def from_python(cls, stream, value, **kwargs): + cls._from_python(stream, value, **kwargs) + + @classmethod + def to_python(cls, ctypes_object, *args, **kwargs): + return cls._to_python(ctypes_object, *args, **kwargs) class StringArray(StandardArray): @@ -633,26 +622,47 @@ class EnumArray(StandardArray): standard_type = EnumObject -class StandardArrayObject(StandardArray): +class StandardArrayObject(Nullable, _StandardArrayBase): _type_name = None _type_id = None + standard_type = None + type_code = None pythonic = list default = [] @classmethod - def build_header_class(cls): - return type( - cls.__name__ + 'Header', - (ctypes.LittleEndianStructure,), - { - '_pack_': 1, - '_fields_': [ - ('type_code', ctypes.c_byte), - ('length', ctypes.c_int), - ], - } + def _parse_header(cls, stream): + int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte) + length = int.from_bytes( + stream.slice(stream.tell() + b_sz, int_sz), + byteorder=PROTOCOL_BYTE_ORDER + ) + stream.seek(int_sz + b_sz, SEEK_CUR) + + return [('type_code', ctypes.c_byte), ('length', ctypes.c_int)], length + + @classmethod + def parse_not_null(cls, stream): + return cls._parse(stream) + + @classmethod + def _write_header(cls, stream, value, **kwargs): + stream.write(cls.type_code) + stream.write( + len(value).to_bytes( + length=ctypes.sizeof(ctypes.c_int), + byteorder=PROTOCOL_BYTE_ORDER + ) ) + @classmethod + def from_python_not_null(cls, stream, value, **kwargs): + cls._from_python(stream, value, **kwargs) + + @classmethod + def to_python_not_null(cls, ctypes_object, *args, **kwargs): + return cls._to_python(ctypes_object, *args, **kwargs) + class StringArrayObject(StandardArrayObject): """ List of strings. """ @@ -714,45 +724,43 @@ class EnumArrayObject(StandardArrayObject): standard_type = EnumObject type_code = TC_ENUM_ARRAY + OBJECT = -1 + @classmethod - def build_header_class(cls): - return type( - cls.__name__ + 'Header', - (ctypes.LittleEndianStructure,), - { - '_pack_': 1, - '_fields_': [ - ('type_code', ctypes.c_byte), - ('type_id', ctypes.c_int), - ('length', ctypes.c_int), - ], - } + def _parse_header(cls, stream): + int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte) + length = int.from_bytes( + stream.slice(stream.tell() + b_sz + int_sz, int_sz), + byteorder=PROTOCOL_BYTE_ORDER ) + stream.seek(2 * int_sz + b_sz, SEEK_CUR) + return [('type_code', ctypes.c_byte), ('type_id', ctypes.c_int), ('length', ctypes.c_int)], length @classmethod - def from_python_not_null(cls, stream, value, **kwargs): - type_id, value = value - header_class = cls.build_header_class() - header = header_class() - if hasattr(header, 'type_code'): - header.type_code = int.from_bytes( - cls.type_code, + def _write_header(cls, stream, value, type_id=-1): + stream.write(cls.type_code) + stream.write( + type_id.to_bytes( + length=ctypes.sizeof(ctypes.c_int), + byteorder=PROTOCOL_BYTE_ORDER, + signed=True + ) + ) + stream.write( + len(value).to_bytes( + length=ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER ) - length = len(value) - header.length = length - header.type_id = type_id + ) - stream.write(header) - for x in value: - cls.standard_type.from_python(stream, x) + @classmethod + def from_python_not_null(cls, stream, value, **kwargs): + type_id, value = value + super().from_python_not_null(stream, value, type_id=type_id) @classmethod - def to_python_not_null(cls, ctype_object, *args, **kwargs): - type_id = getattr(ctype_object, "type_id", None) - if type_id is None: - return None - return type_id, super().to_python(ctype_object, *args, **kwargs) + def to_python_not_null(cls, ctypes_object, *args, **kwargs): + return ctypes_object.type_id, cls._to_python(ctypes_object, *args, **kwargs) class BinaryEnumArrayObject(EnumArrayObject): diff --git a/pyignite/queries/response.py b/pyignite/queries/response.py index f0338e1..c0311ec 100644 --- a/pyignite/queries/response.py +++ b/pyignite/queries/response.py @@ -128,25 +128,25 @@ async def _parse_success_async(self, stream, fields: list): c_type = await ignite_type.parse_async(stream) fields.append((name, c_type)) - def to_python(self, ctype_object, *args, **kwargs): + def to_python(self, ctypes_object, *args, **kwargs): if not self.following: return None result = OrderedDict() for name, c_type in self.following: result[name] = c_type.to_python( - getattr(ctype_object, name), + getattr(ctypes_object, name), *args, **kwargs ) return result - async def to_python_async(self, ctype_object, *args, **kwargs): + async def to_python_async(self, ctypes_object, *args, **kwargs): if not self.following: return None values = await asyncio.gather( - *[c_type.to_python_async(getattr(ctype_object, name), *args, **kwargs) for name, c_type in self.following] + *[c_type.to_python_async(getattr(ctypes_object, name), *args, **kwargs) for name, c_type in self.following] ) return OrderedDict([(name, values[i]) for i, (name, _) in enumerate(self.following)]) @@ -239,13 +239,13 @@ def __body_class_post_process(body_class, fields, data_fields): ('more', ctypes.c_byte), ] - def to_python(self, ctype_object, *args, **kwargs): - if getattr(ctype_object, 'status_code', 0) == 0: - result = self.__to_python_result_header(ctype_object, *args, **kwargs) + def to_python(self, ctypes_object, *args, **kwargs): + if getattr(ctypes_object, 'status_code', 0) == 0: + result = self.__to_python_result_header(ctypes_object, *args, **kwargs) - for row_item in ctype_object.data._fields_: + for row_item in ctypes_object.data._fields_: row_name = row_item[0] - row_object = getattr(ctype_object.data, row_name) + row_object = getattr(ctypes_object.data, row_name) row = [] for col_item in row_object._fields_: col_name = col_item[0] @@ -254,14 +254,14 @@ def to_python(self, ctype_object, *args, **kwargs): result['data'].append(row) return result - async def to_python_async(self, ctype_object, *args, **kwargs): - if getattr(ctype_object, 'status_code', 0) == 0: - result = self.__to_python_result_header(ctype_object, *args, **kwargs) + async def to_python_async(self, ctypes_object, *args, **kwargs): + if getattr(ctypes_object, 'status_code', 0) == 0: + result = self.__to_python_result_header(ctypes_object, *args, **kwargs) data_coro = [] - for row_item in ctype_object.data._fields_: + for row_item in ctypes_object.data._fields_: row_name = row_item[0] - row_object = getattr(ctype_object.data, row_name) + row_object = getattr(ctypes_object.data, row_name) row_coro = [] for col_item in row_object._fields_: col_name = col_item[0] @@ -274,18 +274,18 @@ async def to_python_async(self, ctype_object, *args, **kwargs): return result @staticmethod - def __to_python_result_header(ctype_object, *args, **kwargs): + def __to_python_result_header(ctypes_object, *args, **kwargs): result = { - 'more': Bool.to_python(ctype_object.more, *args, **kwargs), + 'more': Bool.to_python(ctypes_object.more, *args, **kwargs), 'data': [], } - if hasattr(ctype_object, 'fields'): - result['fields'] = StringArray.to_python(ctype_object.fields, *args, **kwargs) + if hasattr(ctypes_object, 'fields'): + result['fields'] = StringArray.to_python(ctypes_object.fields, *args, **kwargs) else: - result['field_count'] = Int.to_python(ctype_object.field_count, *args, **kwargs) + result['field_count'] = Int.to_python(ctypes_object.field_count, *args, **kwargs) - if hasattr(ctype_object, 'cursor'): - result['cursor'] = Long.to_python(ctype_object.cursor, *args, **kwargs) + if hasattr(ctypes_object, 'cursor'): + result['cursor'] = Long.to_python(ctypes_object.cursor, *args, **kwargs) return result @@ -328,26 +328,26 @@ def __process_type_exists(stream, fields): return type_exists - def to_python(self, ctype_object, *args, **kwargs): - if getattr(ctype_object, 'status_code', 0) == 0: + def to_python(self, ctypes_object, *args, **kwargs): + if getattr(ctypes_object, 'status_code', 0) == 0: result = { - 'type_exists': Bool.to_python(ctype_object.type_exists) + 'type_exists': Bool.to_python(ctypes_object.type_exists) } - if hasattr(ctype_object, 'body'): - result.update(body_struct.to_python(ctype_object.body)) + if hasattr(ctypes_object, 'body'): + result.update(body_struct.to_python(ctypes_object.body)) - if hasattr(ctype_object, 'enums'): - result['enums'] = enum_struct.to_python(ctype_object.enums) + if hasattr(ctypes_object, 'enums'): + result['enums'] = enum_struct.to_python(ctypes_object.enums) - if hasattr(ctype_object, 'schema'): + if hasattr(ctypes_object, 'schema'): result['schema'] = { x['schema_id']: [ z['schema_field_id'] for z in x['schema_fields'] ] - for x in schema_struct.to_python(ctype_object.schema) + for x in schema_struct.to_python(ctypes_object.schema) } return result - async def to_python_async(self, ctype_object, *args, **kwargs): - return self.to_python(ctype_object, *args, **kwargs) + async def to_python_async(self, ctypes_object, *args, **kwargs): + return self.to_python(ctypes_object, *args, **kwargs) diff --git a/tests/common/test_datatypes.py b/tests/common/test_datatypes.py index c1aa19f..6771f94 100644 --- a/tests/common/test_datatypes.py +++ b/tests/common/test_datatypes.py @@ -50,6 +50,7 @@ # arrays of integers ([1, 2, 3, 5], None), + (b'buzz', None), (b'buzz', ByteArrayObject), (bytearray([7, 8, 8, 11]), None), (bytearray([7, 8, 8, 11]), ByteArrayObject), @@ -122,7 +123,7 @@ ((-1, [(6001, 1), (6002, 2), (6003, 3)]), BinaryEnumArrayObject), # object array - ((ObjectArrayObject.OBJECT, [1, 2, decimal.Decimal('3')]), ObjectArrayObject), + ((ObjectArrayObject.OBJECT, [1, 2, decimal.Decimal('3'), bytearray(b'\x10\x20')]), ObjectArrayObject), # collection ((CollectionObject.LINKED_LIST, [1, 2, 3]), None), @@ -153,42 +154,47 @@ async def test_put_get_data_async(async_cache, value, value_hint): bytearray_params = [ - [1, 2, 3, 5], - (7, 8, 13, 18), - (-128, -1, 0, 1, 127, 255), + ([1, 2, 3, 5], ByteArrayObject), + ((7, 8, 13, 18), ByteArrayObject), + ((-128, -1, 0, 1, 127, 255), ByteArrayObject), + (b'\x01\x03\x10', None), + (bytearray(b'\x01\x30'), None) ] @pytest.mark.parametrize( - 'value', + 'value,type_hint', bytearray_params ) -def test_bytearray_from_list_or_tuple(cache, value): +def test_bytearray_from_different_input(cache, value, type_hint): """ ByteArrayObject's pythonic type is `bytearray`, but it should also accept lists or tuples as a content. """ - - cache.put('my_key', value, value_hint=ByteArrayObject) - - assert cache.get('my_key') == bytearray([unsigned(ch, ctypes.c_ubyte) for ch in value]) + cache.put('my_key', value, value_hint=type_hint) + __check_bytearray_from_different_input(cache.get('my_key'), value) @pytest.mark.parametrize( - 'value', + 'value,type_hint', bytearray_params ) @pytest.mark.asyncio -async def test_bytearray_from_list_or_tuple_async(async_cache, value): +async def test_bytearray_from_different_input_async(async_cache, value, type_hint): """ ByteArrayObject's pythonic type is `bytearray`, but it should also accept lists or tuples as a content. """ - await async_cache.put('my_key', value, value_hint=ByteArrayObject) + __check_bytearray_from_different_input(await async_cache.get('my_key'), value) + - result = await async_cache.get('my_key') - assert result == bytearray([unsigned(ch, ctypes.c_ubyte) for ch in value]) +def __check_bytearray_from_different_input(result, value): + if isinstance(value, (bytes, bytearray)): + assert isinstance(result, bytes) + assert value == result + else: + assert result == bytearray([unsigned(ch, ctypes.c_ubyte) for ch in value]) uuid_params = [ diff --git a/tests/common/test_key_value.py b/tests/common/test_key_value.py index 6e6df61..b03bec2 100644 --- a/tests/common/test_key_value.py +++ b/tests/common/test_key_value.py @@ -422,10 +422,13 @@ async def test_put_get_collection_async(async_cache, key, hinted_value, value): @pytest.fixture def complex_map(): return {"test" + str(i): ((MapObject.HASH_MAP, - {"key_1": ((1, ["value_1", 1.0]), CollectionObject), - "key_2": ((1, [["value_2_1", "1.0"], ["value_2_2", "0.25"]]), CollectionObject), - "key_3": ((1, [["value_3_1", "1.0"], ["value_3_2", "0.25"]]), CollectionObject), - "key_4": ((1, [["value_4_1", "1.0"], ["value_4_2", "0.25"]]), CollectionObject), + {"key_1": ((CollectionObject.ARR_LIST, ["value_1", 1.0]), CollectionObject), + "key_2": ((CollectionObject.ARR_LIST, [["value_2_1", "1.0"], ["value_2_2", "0.25"]]), + CollectionObject), + "key_3": ((CollectionObject.ARR_LIST, [["value_3_1", "1.0"], ["value_3_2", "0.25"]]), + CollectionObject), + "key_4": ((CollectionObject.ARR_LIST, [["value_4_1", "1.0"], ["value_4_2", "0.25"]]), + CollectionObject), 'key_5': False, "key_6": "value_6"}), MapObject) for i in range(10000)} diff --git a/tests/common/test_sql.py b/tests/common/test_sql.py index 0841b7f..b947fbc 100644 --- a/tests/common/test_sql.py +++ b/tests/common/test_sql.py @@ -325,3 +325,131 @@ async def async_inner(): assert test_value == received return async_inner() if isinstance(cache, AioCache) else inner() + + +VARBIN_CREATE_QUERY = 'CREATE TABLE VarbinTable(id int primary key, varbin VARBINARY)' +VARBIN_DROP_QUERY = 'DROP TABLE VarbinTable' +VARBIN_MERGE_QUERY = 'MERGE INTO VarbinTable(id, varbin) VALUES (?, ?)' +VARBIN_SELECT_QUERY = 'SELECT * FROM VarbinTable' + +VARBIN_TEST_PARAMS = [ + bytearray('Test message', 'UTF-8'), + bytes('Test message', 'UTF-8') +] + + +@pytest.fixture +def varbin_table(client): + client.sql(VARBIN_CREATE_QUERY) + yield None + client.sql(VARBIN_DROP_QUERY) + + +@pytest.mark.parametrize( + 'value', VARBIN_TEST_PARAMS +) +def test_sql_cache_varbinary_handling(client, varbin_table, value): + client.sql(VARBIN_MERGE_QUERY, query_args=(1, value)) + with client.sql(VARBIN_SELECT_QUERY) as cursor: + for row in cursor: + assert isinstance(row[1], bytes) + assert row[1] == value + break + + +@pytest.fixture +async def varbin_table_async(async_client): + await async_client.sql(VARBIN_CREATE_QUERY) + yield None + await async_client.sql(VARBIN_DROP_QUERY) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + 'value', VARBIN_TEST_PARAMS +) +async def test_sql_cache_varbinary_handling_async(async_client, varbin_table_async, value): + await async_client.sql(VARBIN_MERGE_QUERY, query_args=(1, value)) + async with async_client.sql(VARBIN_SELECT_QUERY) as cursor: + async for row in cursor: + assert isinstance(row[1], bytes) + assert row[1] == value + break + + +@pytest.fixture +def varbin_cache_settings(): + cache_name = 'varbin_cache' + table_name = f'{cache_name}_table'.upper() + + yield { + PROP_NAME: cache_name, + PROP_SQL_SCHEMA: 'PUBLIC', + PROP_CACHE_MODE: CacheMode.PARTITIONED, + PROP_QUERY_ENTITIES: [ + { + 'table_name': table_name, + 'key_field_name': 'ID', + 'value_field_name': 'VALUE', + 'key_type_name': 'java.lang.Long', + 'value_type_name': 'byte[]', + 'query_indexes': [], + 'field_name_aliases': [], + 'query_fields': [ + { + 'name': 'ID', + 'type_name': 'java.lang.Long', + 'is_key_field': True, + 'is_notnull_constraint_field': True, + }, + { + 'name': 'VALUE', + 'type_name': 'byte[]', + }, + ], + }, + ], + } + + +VARBIN_CACHE_TABLE_NAME = 'varbin_cache_table'.upper() +VARBIN_CACHE_SELECT_QUERY = f'SELECT * FROM {VARBIN_CACHE_TABLE_NAME}' + + +@pytest.fixture +def varbin_cache(client, varbin_cache_settings): + cache = client.get_or_create_cache(varbin_cache_settings) + yield cache + cache.destroy() + + +@pytest.mark.parametrize( + 'value', VARBIN_TEST_PARAMS +) +def test_cache_varbinary_handling(client, varbin_cache, value): + varbin_cache.put(1, value) + with client.sql(VARBIN_CACHE_SELECT_QUERY) as cursor: + for row in cursor: + assert isinstance(row[1], bytes) + assert row[1] == value + break + + +@pytest.fixture +async def varbin_cache_async(async_client, varbin_cache_settings): + cache = await async_client.get_or_create_cache(varbin_cache_settings) + yield cache + await cache.destroy() + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + 'value', VARBIN_TEST_PARAMS +) +async def test_cache_varbinary_handling_async(async_client, varbin_cache_async, value): + await varbin_cache_async.put(1, value) + async with async_client.sql(VARBIN_CACHE_SELECT_QUERY) as cursor: + async for row in cursor: + assert isinstance(row[1], bytes) + assert row[1] == value + break