diff --git a/.gitignore b/.gitignore index 7372921..d28510c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,8 @@ .eggs .pytest_cache .tox +tests/config/*.xml +junit*.xml pyignite.egg-info ignite-log-* -__pycache__ \ No newline at end of file +__pycache__ diff --git a/pyignite/api/affinity.py b/pyignite/api/affinity.py index d28cfb8..16148a1 100644 --- a/pyignite/api/affinity.py +++ b/pyignite/api/affinity.py @@ -55,12 +55,12 @@ partition_mapping = StructArray([ ('is_applicable', Bool), - ('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] == b'\x01', - lambda ctx: ctx['is_applicable'] is True, + ('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1, + lambda ctx: ctx['is_applicable'], cache_mapping, empty_cache_mapping)), - ('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] == b'\x01', - lambda ctx: ctx['is_applicable'] is True, + ('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1, + lambda ctx: ctx['is_applicable'], node_mapping, empty_node_mapping)), ]) diff --git a/pyignite/api/binary.py b/pyignite/api/binary.py index 722001a..0e63c17 100644 --- a/pyignite/api/binary.py +++ b/pyignite/api/binary.py @@ -24,16 +24,15 @@ from pyignite.queries.op_codes import * from pyignite.utils import int_overflow, entity_id from .result import APIResult +from ..stream import BinaryStream, READ_BACKWARD from ..queries.response import Response -def get_binary_type( - connection: 'Connection', binary_type: Union[str, int], query_id=None, -) -> APIResult: +def get_binary_type(conn: 'Connection', binary_type: Union[str, int], query_id=None) -> APIResult: """ Gets the binary type information by type ID. - :param connection: connection to Ignite server, + :param conn: connection to Ignite server, :param binary_type: binary type name or ID, :param query_id: (optional) a value generated by client and returned as-is in response.query_id. When the parameter is omitted, a random value @@ -49,39 +48,42 @@ def get_binary_type( query_id=query_id, ) - _, send_buffer = query_struct.from_python({ - 'type_id': entity_id(binary_type), - }) - connection.send(send_buffer) + with BinaryStream(conn) as stream: + query_struct.from_python(stream, { + 'type_id': entity_id(binary_type), + }) + conn.send(stream.getbuffer()) - response_head_struct = Response(protocol_version=connection.get_protocol_version(), + response_head_struct = Response(protocol_version=conn.get_protocol_version(), following=[('type_exists', Bool)]) - response_head_type, recv_buffer = response_head_struct.parse(connection) - response_head = response_head_type.from_buffer_copy(recv_buffer) - response_parts = [] - if response_head.type_exists: - resp_body_type, resp_body_buffer = body_struct.parse(connection) - response_parts.append(('body', resp_body_type)) - resp_body = resp_body_type.from_buffer_copy(resp_body_buffer) - recv_buffer += resp_body_buffer - if resp_body.is_enum: - resp_enum, resp_enum_buffer = enum_struct.parse(connection) - response_parts.append(('enums', resp_enum)) - recv_buffer += resp_enum_buffer - resp_schema_type, resp_schema_buffer = schema_struct.parse(connection) - response_parts.append(('schema', resp_schema_type)) - recv_buffer += resp_schema_buffer - - response_class = type( - 'GetBinaryTypeResponse', - (response_head_type,), - { - '_pack_': 1, - '_fields_': response_parts, - } - ) - response = response_class.from_buffer_copy(recv_buffer) + with BinaryStream(conn, conn.recv()) as stream: + init_pos = stream.tell() + response_head_type = response_head_struct.parse(stream) + response_head = stream.read_ctype(response_head_type, direction=READ_BACKWARD) + + response_parts = [] + if response_head.type_exists: + resp_body_type = body_struct.parse(stream) + response_parts.append(('body', resp_body_type)) + resp_body = stream.read_ctype(resp_body_type, direction=READ_BACKWARD) + if resp_body.is_enum: + resp_enum = enum_struct.parse(stream) + response_parts.append(('enums', resp_enum)) + + resp_schema_type = schema_struct.parse(stream) + response_parts.append(('schema', resp_schema_type)) + + response_class = type( + 'GetBinaryTypeResponse', + (response_head_type,), + { + '_pack_': 1, + '_fields_': response_parts, + } + ) + response = stream.read_ctype(response_class, position=init_pos) + result = APIResult(response) if result.status != 0: return result diff --git a/pyignite/binary.py b/pyignite/binary.py index 5d76c1b..da62bb5 100644 --- a/pyignite/binary.py +++ b/pyignite/binary.py @@ -102,18 +102,17 @@ def __new__( mcs, name, (GenericObjectProps, )+base_classes, namespace ) - def _build(self, client: 'Client' = None) -> int: + def _from_python(self, stream, save_to_buf=False): """ Method for building binary representation of the Generic object and calculating a hashcode from it. :param self: Generic object instance, - :param client: (optional) connection to Ignite cluster, + :param stream: BinaryStream + :param save_to_buf: Optional. If True, save serialized data to buffer. """ - if client is None: - compact_footer = True - else: - compact_footer = client.compact_footer + + compact_footer = stream.compact_footer # prepare header header_class = BinaryObject.build_header() @@ -129,18 +128,19 @@ def _build(self, client: 'Client' = None) -> int: header.type_id = self.type_id header.schema_id = self.schema_id + header_len = ctypes.sizeof(header_class) + initial_pos = stream.tell() + # create fields and calculate offsets offsets = [ctypes.sizeof(header_class)] - field_buffer = bytearray() schema_items = list(self.schema.items()) + + stream.seek(initial_pos + header_len) for field_name, field_type in schema_items: - partial_buffer = field_type.from_python( - getattr( - self, field_name, getattr(field_type, 'default', None) - ) - ) - offsets.append(max(offsets) + len(partial_buffer)) - field_buffer += partial_buffer + val = getattr(self, field_name, getattr(field_type, 'default', None)) + field_start_pos = stream.tell() + field_type.from_python(stream, val) + offsets.append(max(offsets) + stream.tell() - field_start_pos) offsets = offsets[:-1] @@ -160,15 +160,18 @@ def _build(self, client: 'Client' = None) -> int: schema[i].offset = offset # calculate size and hash code - header.schema_offset = ( - ctypes.sizeof(header_class) - + len(field_buffer) - ) + fields_data_len = stream.tell() - initial_pos - header_len + header.schema_offset = fields_data_len + header_len header.length = header.schema_offset + ctypes.sizeof(schema_class) - header.hash_code = hashcode(field_buffer) + header.hash_code = stream.hashcode(initial_pos + header_len, fields_data_len) + + stream.seek(initial_pos) + stream.write(header) + stream.seek(initial_pos + header.schema_offset) + stream.write(schema) - # reuse the results - self._buffer = bytes(header) + field_buffer + bytes(schema) + if save_to_buf: + self._buffer = bytes(stream.mem_view(initial_pos, stream.tell() - initial_pos)) self._hashcode = header.hash_code def _setattr(self, attr_name: str, attr_value: Any): @@ -180,7 +183,7 @@ def _setattr(self, attr_name: str, attr_value: Any): # `super()` is really need these parameters super(result, self).__setattr__(attr_name, attr_value) - setattr(result, _build.__name__, _build) + setattr(result, _from_python.__name__, _from_python) setattr(result, '__setattr__', _setattr) setattr(result, '_buffer', None) setattr(result, '_hashcode', None) diff --git a/pyignite/cache.py b/pyignite/cache.py index 64093e8..dd7dac4 100644 --- a/pyignite/cache.py +++ b/pyignite/cache.py @@ -17,7 +17,7 @@ from typing import Any, Dict, Iterable, Optional, Tuple, Union from .constants import * -from .binary import GenericObjectMeta +from .binary import GenericObjectMeta, unwrap_binary from .datatypes import prop_codes from .datatypes.internal import AnyDataObject from .exceptions import ( @@ -26,7 +26,7 @@ ) from .utils import ( cache_id, get_field_by_id, is_wrapped, - status_to_exception, unsigned, unwrap_binary, + status_to_exception, unsigned ) from .api.cache_config import ( cache_create, cache_create_with_config, diff --git a/pyignite/connection/__init__.py b/pyignite/connection/__init__.py index cf40718..0e793f8 100644 --- a/pyignite/connection/__init__.py +++ b/pyignite/connection/__init__.py @@ -35,7 +35,7 @@ from collections import OrderedDict import socket -from threading import Lock +from threading import RLock from typing import Union from pyignite.constants import * @@ -52,6 +52,8 @@ __all__ = ['Connection'] +from ..stream import BinaryStream, READ_BACKWARD + class Connection: """ @@ -60,8 +62,7 @@ class Connection: * socket wrapper. Detects fragmentation and network errors. See also https://docs.python.org/3/howto/sockets.html, - * binary protocol connector. Incapsulates handshake, data read-ahead and - failover reconnection. + * binary protocol connector. Incapsulates handshake and failover reconnection. """ _socket = None @@ -72,7 +73,6 @@ class Connection: host = None port = None timeout = None - prefetch = None username = None password = None ssl_params = {} @@ -97,7 +97,7 @@ def _check_ssl_params(params): ).format(param)) def __init__( - self, client: 'Client', prefetch: bytes = b'', timeout: int = None, + self, client: 'Client', timeout: float = 2.0, username: str = None, password: str = None, **ssl_params ): """ @@ -107,8 +107,6 @@ def __init__( https://docs.python.org/3/library/ssl.html#ssl-certificates. :param client: Ignite client object, - :param prefetch: (optional) initialize the read-ahead data buffer. - Empty by default, :param timeout: (optional) sets timeout (in seconds) for each socket operation including `connect`. 0 means non-blocking mode, which is virtually guaranteed to fail. Can accept integer or float value. @@ -143,7 +141,6 @@ def __init__( :param password: (optional) password to authenticate to Ignite cluster. """ self.client = client - self.prefetch = prefetch self.timeout = timeout self.username = username self.password = password @@ -152,7 +149,8 @@ def __init__( ssl_params['use_ssl'] = True self.ssl_params = ssl_params self._failed = False - self._in_use = Lock() + self._mux = RLock() + self._in_use = False @property def socket(self) -> socket.socket: @@ -162,17 +160,20 @@ def socket(self) -> socket.socket: @property def closed(self) -> bool: """ Tells if socket is closed. """ - return self._socket is None + with self._mux: + return self._socket is None @property def failed(self) -> bool: """ Tells if connection is failed. """ - return self._failed + with self._mux: + return self._failed @property def alive(self) -> bool: """ Tells if connection is up and no failure detected. """ - return not (self._failed or self.closed) + with self._mux: + return not (self._failed or self.closed) def __repr__(self) -> str: return '{}:{}'.format(self.host or '?', self.port or '?') @@ -189,8 +190,10 @@ def get_protocol_version(self): def _fail(self): """ set client to failed state. """ - self._failed = True - self._in_use.release() + with self._mux: + self._failed = True + + self._in_use = False def read_response(self) -> Union[dict, OrderedDict]: """ @@ -202,26 +205,27 @@ def read_response(self) -> Union[dict, OrderedDict]: ('length', Int), ('op_code', Byte), ]) - start_class, start_buffer = response_start.parse(self) - start = start_class.from_buffer_copy(start_buffer) - data = response_start.to_python(start) - response_end = None - if data['op_code'] == 0: - response_end = Struct([ - ('version_major', Short), - ('version_minor', Short), - ('version_patch', Short), - ('message', String), - ]) - elif self.get_protocol_version() >= (1, 4, 0): - response_end = Struct([ - ('node_uuid', UUIDObject), - ]) - if response_end: - end_class, end_buffer = response_end.parse(self) - end = end_class.from_buffer_copy(end_buffer) - data.update(response_end.to_python(end)) - return data + with BinaryStream(self, self.recv()) as stream: + start_class = response_start.parse(stream) + start = stream.read_ctype(start_class, direction=READ_BACKWARD) + data = response_start.to_python(start) + response_end = None + if data['op_code'] == 0: + response_end = Struct([ + ('version_major', Short), + ('version_minor', Short), + ('version_patch', Short), + ('message', String), + ]) + elif self.get_protocol_version() >= (1, 4, 0): + response_end = Struct([ + ('node_uuid', UUIDObject), + ]) + if response_end: + end_class = response_end.parse(stream) + end = stream.read_ctype(end_class, direction=READ_BACKWARD) + data.update(response_end.to_python(end)) + return data def connect( self, host: str = None, port: int = None @@ -234,9 +238,10 @@ def connect( """ detecting_protocol = False - # go non-blocking for faster reconnect - if not self._in_use.acquire(blocking=False): - raise ConnectionError('Connection is in use.') + with self._mux: + if self._in_use: + raise ConnectionError('Connection is in use.') + self._in_use = True # choose highest version first if self.client.protocol_version is None: @@ -289,7 +294,11 @@ def _connect_version( self.username, self.password ) - self.send(hs_request) + + with BinaryStream(self) as stream: + hs_request.from_python(stream) + self.send(stream.getbuffer()) + hs_response = self.read_response() if hs_response['op_code'] == 0: # disconnect but keep in use @@ -345,12 +354,7 @@ def _reconnect(self): if not self.failed: return - # return connection to initial state regardless of use lock - self.close(release=False) - try: - self._in_use.release() - except RuntimeError: - pass + self.close() # connect and silence the connection errors try: @@ -370,20 +374,7 @@ def _transfer_params(self, to: 'Connection'): to.host = self.host to.port = self.port - def clone(self, prefetch: bytes = b'') -> 'Connection': - """ - Clones this connection in its current state. - - :return: `Connection` object. - """ - clone = self.__class__(self.client, **self.ssl_params) - self._transfer_params(to=clone) - if self.alive: - clone.connect(self.host, self.port) - clone.prefetch = prefetch - return clone - - def send(self, data: bytes, flags=None): + def send(self, data: Union[bytes, bytearray, memoryview], flags=None): """ Send data down the socket. @@ -396,70 +387,45 @@ def send(self, data: bytes, flags=None): kwargs = {} if flags is not None: kwargs['flags'] = flags - data = bytes(data) - total_bytes_sent = 0 - - while total_bytes_sent < len(data): - try: - bytes_sent = self.socket.send( - data[total_bytes_sent:], - **kwargs - ) - except connection_errors: - self._fail() - self.reconnect() - raise - if bytes_sent == 0: - self._fail() - self.reconnect() - raise SocketError('Connection broken.') - total_bytes_sent += bytes_sent - - def recv(self, buffersize, flags=None) -> bytes: - """ - Receive data from socket or read-ahead buffer. - :param buffersize: bytes to receive, - :param flags: (optional) OS-specific flags, - :return: data received. - """ + try: + self.socket.sendall(data, **kwargs) + except Exception: + self._fail() + self.reconnect() + raise + + def recv(self, flags=None) -> bytearray: + def _recv(buffer, num_bytes): + bytes_to_receive = num_bytes + while bytes_to_receive > 0: + try: + bytes_rcvd = self.socket.recv_into(buffer, bytes_to_receive, **kwargs) + if bytes_rcvd == 0: + raise SocketError('Connection broken.') + except connection_errors: + self._fail() + self.reconnect() + raise + + buffer = buffer[bytes_rcvd:] + bytes_to_receive -= bytes_rcvd + if self.closed: raise SocketError('Attempt to use closed connection.') - pref_size = len(self.prefetch) - if buffersize > pref_size: - result = self.prefetch - self.prefetch = b'' - try: - result += self._recv(buffersize-pref_size, flags) - except connection_errors: - self._fail() - self.reconnect() - raise - return result - else: - result = self.prefetch[:buffersize] - self.prefetch = self.prefetch[buffersize:] - return result - - def _recv(self, buffersize, flags=None) -> bytes: - """ - Handle socket data reading. - """ kwargs = {} if flags is not None: kwargs['flags'] = flags - chunks = [] - bytes_rcvd = 0 - while bytes_rcvd < buffersize: - chunk = self.socket.recv(buffersize-bytes_rcvd, **kwargs) - if chunk == b'': - raise SocketError('Connection broken.') - chunks.append(chunk) - bytes_rcvd += len(chunk) + data = bytearray(4) + _recv(memoryview(data), 4) + response_len = int.from_bytes(data, PROTOCOL_BYTE_ORDER) + + data.extend(bytearray(response_len)) + _recv(memoryview(data)[4:], response_len) + return data - return b''.join(chunks) def close(self, release=True): """ @@ -467,16 +433,14 @@ def close(self, release=True): not required, since sockets are automatically closed when garbage-collected. """ - if release: - try: - self._in_use.release() - except RuntimeError: - pass - - if self._socket: - try: - self._socket.shutdown(socket.SHUT_RDWR) - self._socket.close() - except connection_errors: - pass - self._socket = None + with self._mux: + if self._socket: + try: + self._socket.shutdown(socket.SHUT_RDWR) + self._socket.close() + except connection_errors: + pass + self._socket = None + + if release: + self._in_use = False diff --git a/pyignite/connection/handshake.py b/pyignite/connection/handshake.py index 2e0264f..3315c4e 100644 --- a/pyignite/connection/handshake.py +++ b/pyignite/connection/handshake.py @@ -50,7 +50,7 @@ def __init__( ]) self.handshake_struct = Struct(fields) - def __bytes__(self) -> bytes: + def from_python(self, stream): handshake_data = { 'length': 8, 'op_code': OP_HANDSHAKE, @@ -69,4 +69,5 @@ def __bytes__(self) -> bytes: len(self.username), len(self.password), ]) - return self.handshake_struct.from_python(handshake_data) + + self.handshake_struct.from_python(stream, handshake_data) diff --git a/pyignite/datatypes/__init__.py b/pyignite/datatypes/__init__.py index 5024f79..49860bd 100644 --- a/pyignite/datatypes/__init__.py +++ b/pyignite/datatypes/__init__.py @@ -25,3 +25,22 @@ from .primitive_arrays import * from .primitive_objects import * from .standard import * +from ..stream import BinaryStream, READ_BACKWARD + + +def unwrap_binary(client: 'Client', wrapped: tuple) -> object: + """ + Unwrap wrapped BinaryObject and convert it to Python data. + + :param client: connection to Ignite cluster, + :param wrapped: `WrappedDataObject` value, + :return: dict representing wrapped BinaryObject. + """ + from pyignite.datatypes.complex import BinaryObject + + blob, offset = wrapped + with BinaryStream(client.random_node, blob) as stream: + data_class = BinaryObject.parse(stream) + result = BinaryObject.to_python(stream.read_ctype(data_class, direction=READ_BACKWARD), client) + + return result diff --git a/pyignite/datatypes/cache_properties.py b/pyignite/datatypes/cache_properties.py index e94db5f..eadaef9 100644 --- a/pyignite/datatypes/cache_properties.py +++ b/pyignite/datatypes/cache_properties.py @@ -92,10 +92,11 @@ def build_header(cls): ) @classmethod - def parse(cls, connection: 'Connection'): + def parse(cls, stream): + init_pos = stream.tell() header_class = cls.build_header() - header_buffer = connection.recv(ctypes.sizeof(header_class)) - data_class, data_buffer = cls.prop_data_class.parse(connection) + data_class = cls.prop_data_class.parse(stream) + prop_class = type( cls.__name__, (header_class,), @@ -106,7 +107,9 @@ def parse(cls, connection: 'Connection'): ], } ) - return prop_class, header_buffer + data_buffer + + stream.seek(init_pos + ctypes.sizeof(prop_class)) + return prop_class @classmethod def to_python(cls, ctype_object, *args, **kwargs): @@ -115,11 +118,12 @@ def to_python(cls, ctype_object, *args, **kwargs): ) @classmethod - def from_python(cls, value): + def from_python(cls, stream, value): header_class = cls.build_header() header = header_class() header.prop_code = cls.prop_code - return bytes(header) + cls.prop_data_class.from_python(value) + stream.write(bytes(header)) + cls.prop_data_class.from_python(stream, value) class PropName(PropBase): @@ -275,7 +279,7 @@ class PropStatisticsEnabled(PropBase): class AnyProperty(PropBase): @classmethod - def from_python(cls, value): + def from_python(cls, stream, value): raise Exception( 'You must choose a certain type ' 'for your cache configuration property' diff --git a/pyignite/datatypes/complex.py b/pyignite/datatypes/complex.py index 6860583..aed3cda 100644 --- a/pyignite/datatypes/complex.py +++ b/pyignite/datatypes/complex.py @@ -15,27 +15,27 @@ from collections import OrderedDict import ctypes -import inspect +from io import SEEK_CUR from typing import Iterable, Dict from pyignite.constants import * from pyignite.exceptions import ParseError - from .base import IgniteDataType from .internal import AnyDataObject, infer_from_python from .type_codes import * from .type_ids import * from .type_names import * -from .null_object import Null - +from .null_object import Null, Nullable __all__ = [ 'Map', 'ObjectArrayObject', 'CollectionObject', 'MapObject', 'WrappedDataObject', 'BinaryObject', ] +from ..stream import BinaryStream + -class ObjectArrayObject(IgniteDataType): +class ObjectArrayObject(IgniteDataType, Nullable): """ Array of Ignite objects of any consistent type. Its Python representation is tuple(type_id, iterable of any type). The only type ID that makes sense @@ -69,20 +69,14 @@ def build_header(cls): ) @classmethod - def parse(cls, client: 'Client'): - tc_type = client.recv(ctypes.sizeof(ctypes.c_byte)) - - if tc_type == TC_NULL: - return Null.build_c_type(), tc_type - + def parse_not_null(cls, stream): header_class = cls.build_header() - buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type)) - header = header_class.from_buffer_copy(buffer) - fields = [] + header = stream.read_ctype(header_class) + stream.seek(ctypes.sizeof(header_class), SEEK_CUR) + fields = [] for i in range(header.length): - c_type, buffer_fragment = AnyDataObject.parse(client) - buffer += buffer_fragment + c_type = AnyDataObject.parse(stream) fields.append(('element_{}'.format(i), c_type)) final_class = type( @@ -93,15 +87,13 @@ def parse(cls, client: 'Client'): '_fields_': fields, } ) - return final_class, buffer + + return final_class @classmethod - def to_python(cls, ctype_object, *args, **kwargs): + def to_python_not_null(cls, ctype_object, *args, **kwargs): result = [] - length = getattr(ctype_object, "length", None) - if length is None: - return None - for i in range(length): + for i in range(ctype_object.length): result.append( AnyDataObject.to_python( getattr(ctype_object, 'element_{}'.format(i)), @@ -111,10 +103,7 @@ def to_python(cls, ctype_object, *args, **kwargs): return ctype_object.type_id, result @classmethod - def from_python(cls, value): - if value is None: - return Null.from_python() - + def from_python_not_null(cls, stream, value): type_or_id, value = value header_class = cls.build_header() header = header_class() @@ -129,14 +118,13 @@ def from_python(cls, value): length = 1 header.length = length header.type_id = type_or_id - buffer = bytearray(header) + stream.write(header) for x in value: - buffer += infer_from_python(x) - return bytes(buffer) + infer_from_python(stream, x) -class WrappedDataObject(IgniteDataType): +class WrappedDataObject(IgniteDataType, Nullable): """ One or more binary objects can be wrapped in an array. This allows reading, storing, passing and writing objects efficiently without understanding @@ -162,15 +150,9 @@ def build_header(cls): ) @classmethod - def parse(cls, client: 'Client'): - tc_type = client.recv(ctypes.sizeof(ctypes.c_byte)) - - if tc_type == TC_NULL: - return Null.build_c_type(), tc_type - + def parse_not_null(cls, stream): header_class = cls.build_header() - buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type)) - header = header_class.from_buffer_copy(buffer) + header = stream.read_ctype(header_class) final_class = type( cls.__name__, @@ -183,21 +165,20 @@ def parse(cls, client: 'Client'): ], } ) - buffer += client.recv( - ctypes.sizeof(final_class) - ctypes.sizeof(header_class) - ) - return final_class, buffer + + stream.seek(ctypes.sizeof(final_class), SEEK_CUR) + return final_class @classmethod def to_python(cls, ctype_object, *args, **kwargs): return bytes(ctype_object.payload), ctype_object.offset @classmethod - def from_python(cls, value): + def from_python(cls, stream, value): raise ParseError('Send unwrapped data.') -class CollectionObject(IgniteDataType): +class CollectionObject(IgniteDataType, Nullable): """ Similar to object array, but contains platform-agnostic deserialization type hint instead of type ID. @@ -260,20 +241,14 @@ def build_header(cls): ) @classmethod - def parse(cls, client: 'Client'): - tc_type = client.recv(ctypes.sizeof(ctypes.c_byte)) - - if tc_type == TC_NULL: - return Null.build_c_type(), tc_type - + def parse_not_null(cls, stream): header_class = cls.build_header() - buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type)) - header = header_class.from_buffer_copy(buffer) - fields = [] + header = stream.read_ctype(header_class) + stream.seek(ctypes.sizeof(header_class), SEEK_CUR) + fields = [] for i in range(header.length): - c_type, buffer_fragment = AnyDataObject.parse(client) - buffer += buffer_fragment + c_type = AnyDataObject.parse(stream) fields.append(('element_{}'.format(i), c_type)) final_class = type( @@ -284,7 +259,7 @@ def parse(cls, client: 'Client'): '_fields_': fields, } ) - return final_class, buffer + return final_class @classmethod def to_python(cls, ctype_object, *args, **kwargs): @@ -302,10 +277,7 @@ def to_python(cls, ctype_object, *args, **kwargs): return ctype_object.type, result @classmethod - def from_python(cls, value): - if value is None: - return Null.from_python() - + def from_python_not_null(cls, stream, value): type_or_id, value = value header_class = cls.build_header() header = header_class() @@ -320,14 +292,13 @@ def from_python(cls, value): length = 1 header.length = length header.type = type_or_id - buffer = bytearray(header) + stream.write(header) for x in value: - buffer += infer_from_python(x) - return bytes(buffer) + infer_from_python(stream, x) -class Map(IgniteDataType): +class Map(IgniteDataType, Nullable): """ Dictionary type, payload-only. @@ -358,20 +329,14 @@ def build_header(cls): ) @classmethod - def parse(cls, client: 'Client'): - tc_type = client.recv(ctypes.sizeof(ctypes.c_byte)) - - if tc_type == TC_NULL: - return Null.build_c_type(), tc_type - + def parse_not_null(cls, stream): header_class = cls.build_header() - buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type)) - header = header_class.from_buffer_copy(buffer) - fields = [] + header = stream.read_ctype(header_class) + stream.seek(ctypes.sizeof(header_class), SEEK_CUR) + fields = [] for i in range(header.length << 1): - c_type, buffer_fragment = AnyDataObject.parse(client) - buffer += buffer_fragment + c_type = AnyDataObject.parse(stream) fields.append(('element_{}'.format(i), c_type)) final_class = type( @@ -382,7 +347,7 @@ def parse(cls, client: 'Client'): '_fields_': fields, } ) - return final_class, buffer + return final_class @classmethod def to_python(cls, ctype_object, *args, **kwargs): @@ -402,7 +367,7 @@ def to_python(cls, ctype_object, *args, **kwargs): return result @classmethod - def from_python(cls, value, type_id=None): + def from_python(cls, stream, value, type_id=None): header_class = cls.build_header() header = header_class() length = len(value) @@ -414,12 +379,11 @@ def from_python(cls, value, type_id=None): ) if hasattr(header, 'type'): header.type = type_id - buffer = bytearray(header) + stream.write(header) for k, v in value.items(): - buffer += infer_from_python(k) - buffer += infer_from_python(v) - return bytes(buffer) + infer_from_python(stream, k) + infer_from_python(stream, v) class MapObject(Map): @@ -462,15 +426,16 @@ def to_python(cls, ctype_object, *args, **kwargs): ) @classmethod - def from_python(cls, value): + def from_python(cls, stream, value): if value is None: - return Null.from_python() + Null.from_python(stream) + return type_id, value = value - return super().from_python(value, type_id) + super().from_python(stream, value, type_id) -class BinaryObject(IgniteDataType): +class BinaryObject(IgniteDataType, Nullable): _type_id = TYPE_BINARY_OBJ type_code = TC_COMPLEX_OBJECT @@ -482,42 +447,14 @@ class BinaryObject(IgniteDataType): COMPACT_FOOTER = 0x0020 @staticmethod - def find_client(): - """ - A nice hack. Extracts the nearest `Client` instance from the - call stack. - """ - from pyignite import Client - from pyignite.connection import Connection - - frame = None - try: - for rec in inspect.stack()[2:]: - frame = rec[0] - code = frame.f_code - for varname in code.co_varnames: - suspect = frame.f_locals.get(varname) - if isinstance(suspect, Client): - return suspect - if isinstance(suspect, Connection): - return suspect.client - finally: - del frame - - @staticmethod - def hashcode( - value: object, client: 'Client' = None, *args, **kwargs - ) -> int: + def hashcode(value: object, client: None) -> int: # binary objects's hashcode implementation is special in the sense # that you need to fully serialize the object to calculate # its hashcode - if value._hashcode is None: + if not value._hashcode and client : - # …and for to serialize it you need a Client instance - if client is None: - client = BinaryObject.find_client() - - value._build(client) + with BinaryStream(client.random_node) as stream: + value._from_python(stream, save_to_buf=True) return value._hashcode @@ -565,41 +502,25 @@ def schema_type(cls, flags: int): }, ) - @staticmethod - def get_dataclass(conn: 'Connection', header) -> OrderedDict: - # get field names from outer space - result = conn.client.query_binary_type( - header.type_id, - header.schema_id - ) - if not result: - raise ParseError('Binary type is not registered') - return result - @classmethod - def parse(cls, client: 'Client'): + def parse_not_null(cls, stream): from pyignite.datatypes import Struct - tc_type = client.recv(ctypes.sizeof(ctypes.c_byte)) - - if tc_type == TC_NULL: - return Null.build_c_type(), tc_type header_class = cls.build_header() - buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type)) - header = header_class.from_buffer_copy(buffer) + header = stream.read_ctype(header_class) + stream.seek(ctypes.sizeof(header_class), SEEK_CUR) # ignore full schema, always retrieve fields' types and order # from complex types registry - data_class = cls.get_dataclass(client, header) + data_class = stream.get_dataclass(header) fields = data_class.schema.items() object_fields_struct = Struct(fields) - object_fields, object_fields_buffer = object_fields_struct.parse(client) - buffer += object_fields_buffer + object_fields = object_fields_struct.parse(stream) final_class_fields = [('object_fields', object_fields)] if header.flags & cls.HAS_SCHEMA: schema = cls.schema_type(header.flags) * len(fields) - buffer += client.recv(ctypes.sizeof(schema)) + stream.seek(ctypes.sizeof(schema), SEEK_CUR) final_class_fields.append(('schema', schema)) final_class = type( @@ -611,8 +532,8 @@ def parse(cls, client: 'Client'): } ) # register schema encoding approach - client.compact_footer = bool(header.flags & cls.COMPACT_FOOTER) - return final_class, buffer + stream.compact_footer = bool(header.flags & cls.COMPACT_FOOTER) + return final_class @classmethod def to_python(cls, ctype_object, client: 'Client' = None, *args, **kwargs): @@ -642,23 +563,9 @@ def to_python(cls, ctype_object, client: 'Client' = None, *args, **kwargs): return result @classmethod - def from_python(cls, value: object): - if value is None: - return Null.from_python() - - if getattr(value, '_buffer', None) is None: - client = cls.find_client() - - # if no client can be found, the class of the `value` is discarded - # and the new dataclass is automatically registered later on - if client: - client.register_binary_type(value.__class__) - else: - raise Warning( - 'Can not register binary type {}'.format(value.type_name) - ) - - # build binary representation - value._build(client) - - return value._buffer + def from_python_not_null(cls, stream, value): + stream.register_binary_type(value.__class__) + if getattr(value, '_buffer', None): + stream.write(value._buffer) + else: + value._from_python(stream) diff --git a/pyignite/datatypes/internal.py b/pyignite/datatypes/internal.py index 23b9cc4..0111a22 100644 --- a/pyignite/datatypes/internal.py +++ b/pyignite/datatypes/internal.py @@ -17,6 +17,7 @@ import ctypes import decimal from datetime import date, datetime, timedelta +from io import SEEK_CUR from typing import Any, Tuple, Union, Callable import uuid @@ -33,6 +34,8 @@ 'infer_from_python', ] +from ..stream import READ_BACKWARD + def tc_map(key: bytes, _memo_map: dict = {}): """ @@ -119,11 +122,12 @@ def __init__(self, predicate1: Callable[[any], bool], predicate2: Callable[[any] self.var1 = var1 self.var2 = var2 - def parse(self, client: 'Client', context): - return self.var1.parse(client) if self.predicate1(context) else self.var2.parse(client) + def parse(self, stream, context): + return self.var1.parse(stream) if self.predicate1(context) else self.var2.parse(stream) def to_python(self, ctype_object, context, *args, **kwargs): - return self.var1.to_python(ctype_object, *args, **kwargs) if self.predicate2(context) else self.var2.to_python(ctype_object, *args, **kwargs) + return self.var1.to_python(ctype_object, *args, **kwargs) if self.predicate2(context)\ + else self.var2.to_python(ctype_object, *args, **kwargs) @attr.s class StructArray: @@ -144,14 +148,17 @@ def build_header_class(self): }, ) - def parse(self, client: 'Client'): - buffer = client.recv(ctypes.sizeof(self.counter_type)) - length = int.from_bytes(buffer, byteorder=PROTOCOL_BYTE_ORDER) - fields = [] + def parse(self, stream): + counter_type_len = ctypes.sizeof(self.counter_type) + length = int.from_bytes( + stream.mem_view(offset=counter_type_len), + byteorder=PROTOCOL_BYTE_ORDER + ) + stream.seek(counter_type_len, SEEK_CUR) + fields = [] for i in range(length): - c_type, buffer_fragment = Struct(self.following).parse(client) - buffer += buffer_fragment + c_type = Struct(self.following).parse(stream) fields.append(('element_{}'.format(i), c_type)) data_class = type( @@ -163,7 +170,7 @@ def parse(self, client: 'Client'): }, ) - return data_class, buffer + return data_class def to_python(self, ctype_object, *args, **kwargs): result = [] @@ -179,20 +186,19 @@ def to_python(self, ctype_object, *args, **kwargs): ) return result - def from_python(self, value): + def from_python(self, stream, value): length = len(value) header_class = self.build_header_class() header = header_class() header.length = length - buffer = bytearray(header) + + stream.write(header) for i, v in enumerate(value): for default_key, default_value in self.defaults.items(): v.setdefault(default_key, default_value) for name, el_class in self.following: - buffer += el_class.from_python(v[name]) - - return bytes(buffer) + el_class.from_python(stream, v[name]) @attr.s @@ -202,21 +208,13 @@ class Struct: dict_type = attr.ib(default=OrderedDict) defaults = attr.ib(type=dict, default={}) - def parse( - self, client: 'Client' - ) -> Tuple[ctypes.LittleEndianStructure, bytes]: - buffer = b'' - fields = [] - values = {} - + def parse(self, stream): + fields, values = [], {} for name, c_type in self.fields: is_cond = isinstance(c_type, Conditional) - c_type, buffer_fragment = c_type.parse(client, values) if is_cond else c_type.parse(client) - buffer += buffer_fragment - + c_type = c_type.parse(stream, values) if is_cond else c_type.parse(stream) fields.append((name, c_type)) - - values[name] = buffer_fragment + values[name] = stream.read_ctype(c_type, direction=READ_BACKWARD) data_class = type( 'Struct', @@ -227,7 +225,7 @@ def parse( }, ) - return data_class, buffer + return data_class def to_python( self, ctype_object, *args, **kwargs @@ -245,16 +243,12 @@ def to_python( ) return result - def from_python(self, value) -> bytes: - buffer = b'' - + def from_python(self, stream, value): for default_key, default_value in self.defaults.items(): value.setdefault(default_key, default_value) for name, el_class in self.fields: - buffer += el_class.from_python(value[name]) - - return buffer + el_class.from_python(stream, value[name]) class AnyDataObject: @@ -299,14 +293,13 @@ def get_subtype(iterable, allow_none=False): return type_first @classmethod - def parse(cls, client: 'Client'): - type_code = client.recv(ctypes.sizeof(ctypes.c_byte)) + def parse(cls, stream): + type_code = bytes(stream.mem_view(offset=ctypes.sizeof(ctypes.c_byte))) try: data_class = tc_map(type_code) except KeyError: raise ParseError('Unknown type code: `{}`'.format(type_code)) - client.prefetch += type_code - return data_class.parse(client) + return data_class.parse(stream) @classmethod def to_python(cls, ctype_object, *args, **kwargs): @@ -418,11 +411,12 @@ def map_python_type(cls, value): ) @classmethod - def from_python(cls, value): - return cls.map_python_type(value).from_python(value) + def from_python(cls, stream, value): + p_type = cls.map_python_type(value) + p_type.from_python(stream, value) -def infer_from_python(value: Any): +def infer_from_python(stream, value: Any): """ Convert pythonic value to ctypes buffer, type hint-aware. @@ -433,7 +427,8 @@ def infer_from_python(value: Any): value, data_type = value else: data_type = AnyDataObject - return data_type.from_python(value) + + data_type.from_python(stream, value) @attr.s @@ -455,15 +450,14 @@ def build_header(self): } ) - def parse(self, client: 'Client'): + def parse(self, stream): header_class = self.build_header() - buffer = client.recv(ctypes.sizeof(header_class)) - header = header_class.from_buffer_copy(buffer) - fields = [] + header = stream.read_ctype(header_class) + stream.seek(ctypes.sizeof(header_class), SEEK_CUR) + fields = [] for i in range(header.length): - c_type, buffer_fragment = super().parse(client) - buffer += buffer_fragment + c_type = super().parse(stream) fields.append(('element_{}'.format(i), c_type)) final_class = type( @@ -474,7 +468,7 @@ def parse(self, client: 'Client'): '_fields_': fields, } ) - return final_class, buffer + return final_class @classmethod def to_python(cls, ctype_object, *args, **kwargs): @@ -491,7 +485,7 @@ def to_python(cls, ctype_object, *args, **kwargs): ) return result - def from_python(self, value): + def from_python(self, stream, value): header_class = self.build_header() header = header_class() @@ -501,8 +495,7 @@ def from_python(self, value): value = [value] length = 1 header.length = length - buffer = bytearray(header) + stream.write(header) for x in value: - buffer += infer_from_python(x) - return bytes(buffer) + infer_from_python(stream, x) diff --git a/pyignite/datatypes/null_object.py b/pyignite/datatypes/null_object.py index 19b41c7..912ded8 100644 --- a/pyignite/datatypes/null_object.py +++ b/pyignite/datatypes/null_object.py @@ -20,6 +20,7 @@ """ import ctypes +from io import SEEK_CUR from typing import Any from .base import IgniteDataType @@ -28,6 +29,8 @@ __all__ = ['Null'] +from ..constants import PROTOCOL_BYTE_ORDER + class Null(IgniteDataType): default = None @@ -55,16 +58,56 @@ def build_c_type(cls): return cls._object_c_type @classmethod - def parse(cls, client: 'Client'): - buffer = client.recv(ctypes.sizeof(ctypes.c_byte)) - data_type = cls.build_c_type() - return data_type, buffer + def parse(cls, stream): + init_pos, offset = stream.tell(), ctypes.sizeof(ctypes.c_byte) + stream.seek(offset, SEEK_CUR) + return cls.build_c_type() @staticmethod def to_python(*args, **kwargs): return None @staticmethod - def from_python(*args): - return TC_NULL + def from_python(stream, *args): + stream.write(TC_NULL) + + +class Nullable: + @classmethod + def parse_not_null(cls, stream): + raise NotImplementedError + + @classmethod + def parse(cls, stream): + type_len = ctypes.sizeof(ctypes.c_byte) + + if stream.mem_view(offset=type_len) == TC_NULL: + stream.seek(type_len, SEEK_CUR) + return Null.build_c_type() + + return cls.parse_not_null(stream) + @classmethod + def to_python_not_null(cls, ctypes_object, *args, **kwargs): + raise NotImplementedError + + @classmethod + def to_python(cls, ctypes_object, *args, **kwargs): + if ctypes_object.type_code == int.from_bytes( + TC_NULL, + byteorder=PROTOCOL_BYTE_ORDER + ): + return None + + return cls.to_python_not_null(ctypes_object, *args, **kwargs) + + @classmethod + def from_python_not_null(cls, stream, value): + raise NotImplementedError + + @classmethod + def from_python(cls, stream, value): + if value is None: + Null.from_python(stream) + else: + cls.from_python_not_null(stream, value) diff --git a/pyignite/datatypes/primitive.py b/pyignite/datatypes/primitive.py index d549fda..ffa2e32 100644 --- a/pyignite/datatypes/primitive.py +++ b/pyignite/datatypes/primitive.py @@ -15,7 +15,7 @@ import ctypes import struct -import sys +from io import SEEK_CUR from pyignite.constants import * from .base import IgniteDataType @@ -47,8 +47,10 @@ class Primitive(IgniteDataType): c_type = None @classmethod - def parse(cls, client: 'Client'): - return cls.c_type, client.recv(ctypes.sizeof(cls.c_type)) + def parse(cls, stream): + init_pos, offset = stream.tell(), ctypes.sizeof(cls.c_type) + stream.seek(offset, SEEK_CUR) + return cls.c_type @classmethod def to_python(cls, ctype_object, *args, **kwargs): @@ -61,8 +63,8 @@ class Byte(Primitive): c_type = ctypes.c_byte @classmethod - def from_python(cls, value): - return struct.pack(" 0: + @classmethod + def to_python_not_null(cls, ctype_object, *args, **kwargs): + if ctype_object.length > 0: return ctype_object.data.decode(PROTOCOL_STRING_ENCODING) - else: - return '' - @classmethod - def from_python(cls, value): - if value is None: - return Null.from_python() + return '' + @classmethod + def from_python_not_null(cls, stream, value): if isinstance(value, str): value = value.encode(PROTOCOL_STRING_ENCODING) length = len(value) @@ -135,10 +120,11 @@ def from_python(cls, value): ) data_object.length = length data_object.data = value - return bytes(data_object) + + stream.write(data_object) -class DecimalObject(IgniteDataType): +class DecimalObject(IgniteDataType, Nullable): _type_name = NAME_DECIMAL _type_id = TYPE_DECIMAL type_code = TC_DECIMAL @@ -165,18 +151,10 @@ def build_c_header(cls): ) @classmethod - def parse(cls, client: 'Client'): - tc_type = client.recv(ctypes.sizeof(ctypes.c_byte)) - # Decimal or Null - if tc_type == TC_NULL: - return Null.build_c_type(), tc_type - + def parse_not_null(cls, stream): header_class = cls.build_c_header() - buffer = tc_type + client.recv( - ctypes.sizeof(header_class) - - len(tc_type) - ) - header = header_class.from_buffer_copy(buffer) + header = stream.read_ctype(header_class) + data_type = type( cls.__name__, (header_class,), @@ -187,17 +165,12 @@ def parse(cls, client: 'Client'): ], } ) - buffer += client.recv( - ctypes.sizeof(data_type) - - ctypes.sizeof(header_class) - ) - return data_type, buffer - @classmethod - def to_python(cls, ctype_object, *args, **kwargs): - if getattr(ctype_object, 'length', None) is None: - return None + stream.seek(ctypes.sizeof(data_type), SEEK_CUR) + return data_type + @classmethod + def to_python_not_null(cls, ctype_object, *args, **kwargs): sign = 1 if ctype_object.data[0] & 0x80 else 0 data = ctype_object.data[1:] data.insert(0, ctype_object.data[0] & 0x7f) @@ -218,10 +191,7 @@ def to_python(cls, ctype_object, *args, **kwargs): return result @classmethod - def from_python(cls, value: decimal.Decimal): - if value is None: - return Null.from_python() - + def from_python_not_null(cls, stream, value: decimal.Decimal): sign, digits, scale = value.normalize().as_tuple() integer = int(''.join([str(d) for d in digits])) # calculate number of bytes (at least one, and not forget the sign bit) @@ -257,7 +227,8 @@ def from_python(cls, value: decimal.Decimal): data_object.scale = -scale for i in range(length): data_object.data[i] = data[i] - return bytes(data_object) + + stream.write(data_object) class UUIDObject(StandardObject): @@ -300,10 +271,7 @@ def build_c_type(cls): return cls._object_c_type @classmethod - def from_python(cls, value: uuid.UUID): - if value is None: - return Null.from_python() - + def from_python_not_null(cls, stream, value: uuid.UUID): data_type = cls.build_c_type() data_object = data_type() data_object.type_code = int.from_bytes( @@ -312,15 +280,11 @@ def from_python(cls, value: uuid.UUID): ) for i, byte in zip(cls.UUID_BYTE_ORDER, bytearray(value.bytes)): data_object.value[i] = byte - return bytes(data_object) + + stream.write(data_object) @classmethod - def to_python(cls, ctypes_object, *args, **kwargs): - if ctypes_object.type_code == int.from_bytes( - TC_NULL, - byteorder=PROTOCOL_BYTE_ORDER - ): - return None + def to_python_not_null(cls, ctypes_object, *args, **kwargs): uuid_array = bytearray(ctypes_object.value) return uuid.UUID( bytes=bytes([uuid_array[i] for i in cls.UUID_BYTE_ORDER]) @@ -367,9 +331,7 @@ def build_c_type(cls): return cls._object_c_type @classmethod - def from_python(cls, value: tuple): - if value is None: - return Null.from_python() + def from_python_not_null(cls, stream, value: tuple): data_type = cls.build_c_type() data_object = data_type() data_object.type_code = int.from_bytes( @@ -378,15 +340,11 @@ def from_python(cls, value: tuple): ) data_object.epoch = int(value[0].timestamp() * 1000) data_object.fraction = value[1] - return bytes(data_object) + + stream.write(data_object) @classmethod - def to_python(cls, ctypes_object, *args, **kwargs): - if ctypes_object.type_code == int.from_bytes( - TC_NULL, - byteorder=PROTOCOL_BYTE_ORDER - ): - return None + def to_python_not_null(cls, ctypes_object, *args, **kwargs): return ( datetime.fromtimestamp(ctypes_object.epoch/1000), ctypes_object.fraction @@ -428,9 +386,7 @@ def build_c_type(cls): return cls._object_c_type @classmethod - def from_python(cls, value: [date, datetime]): - if value is None: - return Null.from_python() + def from_python_not_null(cls, stream, value: [date, datetime]): if type(value) is date: value = datetime.combine(value, time()) data_type = cls.build_c_type() @@ -440,15 +396,11 @@ def from_python(cls, value: [date, datetime]): byteorder=PROTOCOL_BYTE_ORDER ) data_object.epoch = int(value.timestamp() * 1000) - return bytes(data_object) + + stream.write(data_object) @classmethod - def to_python(cls, ctypes_object, *args, **kwargs): - if ctypes_object.type_code == int.from_bytes( - TC_NULL, - byteorder=PROTOCOL_BYTE_ORDER - ): - return None + def to_python_not_null(cls, ctypes_object, *args, **kwargs): return datetime.fromtimestamp(ctypes_object.epoch/1000) @@ -486,9 +438,7 @@ def build_c_type(cls): return cls._object_c_type @classmethod - def from_python(cls, value: timedelta): - if value is None: - return Null.from_python() + def from_python_not_null(cls, stream, value: timedelta): data_type = cls.build_c_type() data_object = data_type() data_object.type_code = int.from_bytes( @@ -496,15 +446,11 @@ def from_python(cls, value: timedelta): byteorder=PROTOCOL_BYTE_ORDER ) data_object.value = int(value.total_seconds() * 1000) - return bytes(data_object) + + stream.write(data_object) @classmethod - def to_python(cls, ctypes_object, *args, **kwargs): - if ctypes_object.type_code == int.from_bytes( - TC_NULL, - byteorder=PROTOCOL_BYTE_ORDER - ): - return None + def to_python_not_null(cls, ctypes_object, *args, **kwargs): return timedelta(milliseconds=ctypes_object.value) @@ -539,10 +485,7 @@ def build_c_type(cls): return cls._object_c_type @classmethod - def from_python(cls, value: tuple): - if value is None: - return Null.from_python() - + def from_python_not_null(cls, stream, value: tuple): data_type = cls.build_c_type() data_object = data_type() data_object.type_code = int.from_bytes( @@ -550,15 +493,11 @@ def from_python(cls, value: tuple): byteorder=PROTOCOL_BYTE_ORDER ) data_object.type_id, data_object.ordinal = value - return bytes(data_object) + + stream.write(data_object) @classmethod - def to_python(cls, ctypes_object, *args, **kwargs): - if ctypes_object.type_code == int.from_bytes( - TC_NULL, - byteorder=PROTOCOL_BYTE_ORDER - ): - return None + def to_python_not_null(cls, ctypes_object, *args, **kwargs): return ctypes_object.type_id, ctypes_object.ordinal @@ -571,7 +510,7 @@ class BinaryEnumObject(EnumObject): type_code = TC_BINARY_ENUM -class StandardArray(IgniteDataType): +class StandardArray(IgniteDataType, Nullable): """ Base class for array of primitives. Payload-only. """ @@ -599,19 +538,14 @@ def build_header_class(cls): ) @classmethod - def parse(cls, client: 'Client'): - tc_type = client.recv(ctypes.sizeof(ctypes.c_byte)) - - if tc_type == TC_NULL: - return Null.build_c_type(), tc_type - + def parse_not_null(cls, stream): header_class = cls.build_header_class() - buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type)) - header = header_class.from_buffer_copy(buffer) + header = stream.read_ctype(header_class) + stream.seek(ctypes.sizeof(header_class), SEEK_CUR) + fields = [] for i in range(header.length): - c_type, buffer_fragment = cls.standard_type.parse(client) - buffer += buffer_fragment + c_type = cls.standard_type.parse(stream) fields.append(('element_{}'.format(i), c_type)) final_class = type( @@ -622,14 +556,15 @@ def parse(cls, client: 'Client'): '_fields_': fields, } ) - return final_class, buffer + return final_class @classmethod def to_python(cls, ctype_object, *args, **kwargs): - result = [] length = getattr(ctype_object, "length", None) if length is None: return None + + result = [] for i in range(length): result.append( cls.standard_type.to_python( @@ -640,9 +575,7 @@ def to_python(cls, ctype_object, *args, **kwargs): return result @classmethod - def from_python(cls, value): - if value is None: - return Null.from_python() + def from_python_not_null(cls, stream, value): header_class = cls.build_header_class() header = header_class() if hasattr(header, 'type_code'): @@ -652,11 +585,10 @@ def from_python(cls, value): ) length = len(value) header.length = length - buffer = bytearray(header) + stream.write(header) for x in value: - buffer += cls.standard_type.from_python(x) - return bytes(buffer) + cls.standard_type.from_python(stream, x) class StringArray(StandardArray): @@ -804,10 +736,7 @@ def build_header_class(cls): ) @classmethod - def from_python(cls, value): - if value is None: - return Null.from_python() - + def from_python_not_null(cls, stream, value): type_id, value = value header_class = cls.build_header_class() header = header_class() @@ -819,11 +748,10 @@ def from_python(cls, value): length = len(value) header.length = length header.type_id = type_id - buffer = bytearray(header) + stream.write(header) for x in value: - buffer += cls.standard_type.from_python(x) - return bytes(buffer) + cls.standard_type.from_python(stream, x) @classmethod def to_python(cls, ctype_object, *args, **kwargs): diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py index 69b6fa2..5bd114b 100644 --- a/pyignite/queries/query.py +++ b/pyignite/queries/query.py @@ -21,6 +21,7 @@ from pyignite.connection import Connection from pyignite.constants import MIN_LONG, MAX_LONG, RHF_TOPOLOGY_CHANGED from pyignite.queries.response import Response, SQLResponse +from pyignite.stream import BinaryStream, READ_BACKWARD @attr.s @@ -47,31 +48,28 @@ def build_c_type(cls): ) return cls._query_c_type - def _build_header(self, buffer: bytearray, values: dict): + def _build_header(self, stream, values: dict): header_class = self.build_c_type() + header_len = ctypes.sizeof(header_class) + init_pos = stream.tell() + stream.seek(init_pos + header_len) + header = header_class() header.op_code = self.op_code if self.query_id is None: header.query_id = randint(MIN_LONG, MAX_LONG) for name, c_type in self.following: - buffer += c_type.from_python(values[name]) + c_type.from_python(stream, values[name]) - header.length = ( - len(buffer) - + ctypes.sizeof(header_class) - - ctypes.sizeof(ctypes.c_int) - ) + header.length = stream.tell() - init_pos - ctypes.sizeof(ctypes.c_int) + stream.seek(init_pos) return header - def from_python(self, values: dict = None): - if values is None: - values = {} - buffer = bytearray() - header = self._build_header(buffer, values) - buffer[:0] = bytes(header) - return header.query_id, bytes(buffer) + def from_python(self, stream, values: dict = None): + header = self._build_header(stream, values if values else {}) + stream.write(header) def perform( self, conn: Connection, query_params: dict = None, @@ -89,8 +87,9 @@ def perform( :return: instance of :class:`~pyignite.api.result.APIResult` with raw value (may undergo further processing in API functions). """ - _, send_buffer = self.from_python(query_params) - conn.send(send_buffer) + with BinaryStream(conn) as stream: + self.from_python(stream, query_params) + conn.send(stream.getbuffer()) if sql: response_struct = SQLResponse(protocol_version=conn.get_protocol_version(), @@ -99,8 +98,9 @@ def perform( response_struct = Response(protocol_version=conn.get_protocol_version(), following=response_config) - response_ctype, recv_buffer = response_struct.parse(conn) - response = response_ctype.from_buffer_copy(recv_buffer) + with BinaryStream(conn, conn.recv()) as stream: + response_ctype = response_struct.parse(stream) + response = stream.read_ctype(response_ctype, direction=READ_BACKWARD) # this test depends on protocol version if getattr(response, 'flags', False) & RHF_TOPOLOGY_CHANGED: @@ -140,7 +140,7 @@ def build_c_type(cls): ) return cls._query_c_type - def _build_header(self, buffer: bytearray, values: dict): - header = super()._build_header(buffer, values) + def _build_header(self, stream, values: dict): + header = super()._build_header(stream, values) header.config_length = header.length - ctypes.sizeof(type(header)) return header diff --git a/pyignite/queries/response.py b/pyignite/queries/response.py index 05a519a..016f577 100644 --- a/pyignite/queries/response.py +++ b/pyignite/queries/response.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from io import SEEK_CUR import attr from collections import OrderedDict @@ -21,6 +22,7 @@ from pyignite.connection import Connection from pyignite.datatypes import AnyDataObject, Bool, Int, Long, String, StringArray, Struct from pyignite.queries.op_codes import OP_SUCCESS +from pyignite.stream import READ_BACKWARD @attr.s @@ -55,12 +57,14 @@ def build_header(self): ) return self._response_header - def parse(self, conn: Connection): + def parse(self, stream): + init_pos = stream.tell() header_class = self.build_header() - buffer = bytearray(conn.recv(ctypes.sizeof(header_class))) - header = header_class.from_buffer_copy(buffer) - fields = [] + header_len = ctypes.sizeof(header_class) + header = stream.read_ctype(header_class) + stream.seek(header_len, SEEK_CUR) + fields = [] has_error = False if self.protocol_version and self.protocol_version >= (1, 4, 0): if header.flags & RHF_TOPOLOGY_CHANGED: @@ -76,20 +80,19 @@ def parse(self, conn: Connection): has_error = header.status_code != OP_SUCCESS if fields: - buffer += conn.recv( - sum([ctypes.sizeof(c_type) for _, c_type in fields]) - ) + stream.seek(sum(ctypes.sizeof(c_type) for _, c_type in fields), SEEK_CUR) if has_error: - msg_type, buffer_fragment = String.parse(conn) - buffer += buffer_fragment + msg_type = String.parse(stream) fields.append(('error_message', msg_type)) else: - self._parse_success(conn, buffer, fields) + self._parse_success(stream, fields) - return self._create_parse_result(conn, header_class, fields, buffer) + response_class = self._create_response_class(stream, header_class, fields) + stream.seek(init_pos + ctypes.sizeof(response_class)) + return self._create_response_class(stream, header_class, fields) - def _create_parse_result(self, conn: Connection, header_class, fields: list, buffer: bytearray): + def _create_response_class(self, stream, header_class, fields: list): response_class = type( 'Response', (header_class,), @@ -98,12 +101,11 @@ def _create_parse_result(self, conn: Connection, header_class, fields: list, buf '_fields_': fields, } ) - return response_class, bytes(buffer) + return response_class - def _parse_success(self, conn: Connection, buffer: bytearray, fields: list): + def _parse_success(self, stream, fields: list): for name, ignite_type in self.following: - c_type, buffer_fragment = ignite_type.parse(conn) - buffer += buffer_fragment + c_type = ignite_type.parse(stream) fields.append((name, c_type)) def to_python(self, ctype_object, *args, **kwargs): @@ -134,7 +136,7 @@ def fields_or_field_count(self): return 'fields', StringArray return 'field_count', Int - def _parse_success(self, conn: Connection, buffer: bytearray, fields: list): + def _parse_success(self, stream, fields: list): following = [ self.fields_or_field_count(), ('row_count', Int), @@ -142,9 +144,8 @@ def _parse_success(self, conn: Connection, buffer: bytearray, fields: list): if self.has_cursor: following.insert(0, ('cursor', Long)) body_struct = Struct(following) - body_class, body_buffer = body_struct.parse(conn) - body = body_class.from_buffer_copy(body_buffer) - buffer += body_buffer + body_class = body_struct.parse(stream) + body = stream.read_ctype(body_class, direction=READ_BACKWARD) if self.include_field_names: field_count = body.fields.length @@ -155,9 +156,8 @@ def _parse_success(self, conn: Connection, buffer: bytearray, fields: list): for i in range(body.row_count): row_fields = [] for j in range(field_count): - field_class, field_buffer = AnyDataObject.parse(conn) + field_class = AnyDataObject.parse(stream) row_fields.append(('column_{}'.format(j), field_class)) - buffer += field_buffer row_class = type( 'SQLResponseRow', @@ -182,7 +182,7 @@ def _parse_success(self, conn: Connection, buffer: bytearray, fields: list): ('more', ctypes.c_byte), ] - def _create_parse_result(self, conn: Connection, header_class, fields: list, buffer: bytearray): + def _create_response_class(self, stream, header_class, fields: list): final_class = type( 'SQLResponse', (header_class,), @@ -191,8 +191,7 @@ def _create_parse_result(self, conn: Connection, header_class, fields: list, buf '_fields_': fields, } ) - buffer += conn.recv(ctypes.sizeof(final_class) - len(buffer)) - return final_class, bytes(buffer) + return final_class def to_python(self, ctype_object, *args, **kwargs): if getattr(ctype_object, 'status_code', 0) == 0: diff --git a/pyignite/stream/__init__.py b/pyignite/stream/__init__.py new file mode 100644 index 0000000..94153b4 --- /dev/null +++ b/pyignite/stream/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .binary_stream import BinaryStream, READ_FORWARD, READ_BACKWARD \ No newline at end of file diff --git a/pyignite/stream/binary_stream.py b/pyignite/stream/binary_stream.py new file mode 100644 index 0000000..1ecdcfb --- /dev/null +++ b/pyignite/stream/binary_stream.py @@ -0,0 +1,111 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import ctypes +from io import BytesIO + +import pyignite.utils as ignite_utils + +READ_FORWARD = 0 +READ_BACKWARD = 1 + + +class BinaryStream: + def __init__(self, conn, buf=None): + """ + Initialize binary stream around buffers. + + :param buf: Buffer, optional parameter. If not passed, creates empty BytesIO. + :param conn: Connection instance, required. + """ + from pyignite.connection import Connection + + if not isinstance(conn, Connection): + raise TypeError(f"invalid parameter: expected instance of {Connection}") + + if buf and not isinstance(buf, (bytearray, bytes, memoryview)): + raise TypeError(f"invalid parameter: expected bytes-like object") + + self.conn = conn + self.stream = BytesIO(buf) if buf else BytesIO() + + @property + def compact_footer(self) -> bool: + return self.conn.client.compact_footer + + @compact_footer.setter + def compact_footer(self, value: bool): + self.conn.client.compact_footer = value + + def read(self, size): + buf = bytearray(size) + self.stream.readinto(buf) + return buf + + def read_ctype(self, ctype_class, position=None, direction=READ_FORWARD): + ctype_len = ctypes.sizeof(ctype_class) + + if position is not None and position >= 0: + init_position = position + else: + init_position = self.tell() + + if direction == READ_FORWARD: + start, end = init_position, init_position + ctype_len + else: + start, end = init_position - ctype_len, init_position + + buf = self.stream.getbuffer()[start:end] + return ctype_class.from_buffer_copy(buf) + + def write(self, buf): + return self.stream.write(buf) + + def tell(self): + return self.stream.tell() + + def seek(self, *args, **kwargs): + return self.stream.seek(*args, **kwargs) + + def getvalue(self): + return self.stream.getvalue() + + def getbuffer(self): + return self.stream.getbuffer() + + def mem_view(self, start=-1, offset=0): + start = start if start >= 0 else self.tell() + return self.stream.getbuffer()[start:start+offset] + + def hashcode(self, start, bytes_len): + return ignite_utils.hashcode(self.stream.getbuffer()[start:start+bytes_len]) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.stream.close() + + def get_dataclass(self, header): + # get field names from outer space + result = self.conn.client.query_binary_type( + header.type_id, + header.schema_id + ) + if not result: + raise RuntimeError('Binary type is not registered') + return result + + def register_binary_type(self, *args, **kwargs): + return self.conn.client.register_binary_type(*args, **kwargs) diff --git a/pyignite/utils.py b/pyignite/utils.py index ef7b6f6..3d0378f 100644 --- a/pyignite/utils.py +++ b/pyignite/utils.py @@ -19,7 +19,7 @@ from functools import wraps from threading import Event, Thread -from typing import Any, Callable, Optional, Type, Tuple, Union +from typing import Any, Optional, Type, Tuple, Union from pyignite.datatypes.base import IgniteDataType from .constants import * @@ -85,29 +85,7 @@ def int_overflow(value: int) -> int: return ((value ^ 0x80000000) & 0xffffffff) - 0x80000000 -def unwrap_binary(client: 'Client', wrapped: tuple) -> object: - """ - Unwrap wrapped BinaryObject and convert it to Python data. - - :param client: connection to Ignite cluster, - :param wrapped: `WrappedDataObject` value, - :return: dict representing wrapped BinaryObject. - """ - from pyignite.datatypes.complex import BinaryObject - - blob, offset = wrapped - conn_clone = client.random_node.clone(prefetch=blob) - conn_clone.pos = offset - data_class, data_bytes = BinaryObject.parse(conn_clone) - result = BinaryObject.to_python( - data_class.from_buffer_copy(data_bytes), - client, - ) - conn_clone.close() - return result - - -def hashcode(data: Union[str, bytes]) -> int: +def hashcode(data: Union[str, bytes, bytearray, memoryview]) -> int: """ Calculate hash code used for identifying objects in Ignite binary API. diff --git a/requirements/tests.txt b/requirements/tests.txt index 327f501..893928e 100644 --- a/requirements/tests.txt +++ b/requirements/tests.txt @@ -4,3 +4,4 @@ pytest==3.6.1 pytest-cov==2.5.1 teamcity-messages==1.21 psutil==5.6.5 +jinja2==2.11.3 diff --git a/tests/config/ignite-config-ssl.xml b/tests/config/ignite-config-ssl.xml deleted file mode 100644 index 827405c..0000000 --- a/tests/config/ignite-config-ssl.xml +++ /dev/null @@ -1,51 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/tests/config/ignite-config.xml b/tests/config/ignite-config.xml deleted file mode 100644 index 09fba2c..0000000 --- a/tests/config/ignite-config.xml +++ /dev/null @@ -1,39 +0,0 @@ - - - - - - - - - - - - - - - - - diff --git a/tests/config/ignite-config-base.xml b/tests/config/ignite-config.xml.jinja2 similarity index 65% rename from tests/config/ignite-config-base.xml rename to tests/config/ignite-config.xml.jinja2 index 7487618..322a958 100644 --- a/tests/config/ignite-config-base.xml +++ b/tests/config/ignite-config.xml.jinja2 @@ -26,12 +26,35 @@ http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> - - - - + + {% if use_ssl %} + + {% endif %} + + + + + + + {% if use_ssl %} + + + + + + + + + + + + + {% endif %} + + + + - @@ -42,7 +65,7 @@ - 127.0.0.1:48500..48503 + 127.0.0.1:48500..48510 @@ -69,9 +92,9 @@ - + - + diff --git a/tests/config/log4j.xml b/tests/config/log4j.xml.jinja2 similarity index 90% rename from tests/config/log4j.xml rename to tests/config/log4j.xml.jinja2 index f5562d0..628f66c 100644 --- a/tests/config/log4j.xml +++ b/tests/config/log4j.xml.jinja2 @@ -23,8 +23,8 @@ + filePattern="logs/ignite-log-{{ ignite_instance_idx }}-%i.txt" + fileName="logs/ignite-log-{{ ignite_instance_idx }}.txt"> diff --git a/tests/conftest.py b/tests/conftest.py index 9974b16..54a7fda 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,7 +22,7 @@ from pyignite import Client from pyignite.constants import * from pyignite.api import cache_create, cache_destroy -from tests.util import _start_ignite, start_ignite_gen, get_request_grid_idx +from tests.util import _start_ignite, start_ignite_gen class BoolParser(argparse.Action): @@ -134,12 +134,6 @@ def cache(client): cache_destroy(conn, cache_name) -@pytest.fixture(autouse=True) -def log_init(): - # Init log call timestamp - get_request_grid_idx() - - @pytest.fixture(scope='module') def start_client(use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile, ssl_ca_certfile, ssl_cert_reqs, ssl_ciphers, ssl_version,username, password): diff --git a/tests/test_affinity_request_routing.py b/tests/test_affinity_request_routing.py index eb46ab6..cd0c015 100644 --- a/tests/test_affinity_request_routing.py +++ b/tests/test_affinity_request_routing.py @@ -13,18 +13,56 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import OrderedDict - +from collections import OrderedDict, deque import pytest from pyignite import * +from pyignite.connection import Connection from pyignite.datatypes import * from pyignite.datatypes.cache_config import CacheMode from pyignite.datatypes.prop_codes import * from tests.util import * -@pytest.mark.parametrize("key,grid_idx", [(1, 3), (2, 1), (3, 1), (4, 3), (5, 1), (6, 3), (11, 2), (13, 2), (19, 2)]) +requests = deque() +old_send = Connection.send + + +def patched_send(self, *args, **kwargs): + """Patched send function that push to queue idx of server to which request is routed.""" + requests.append(self.port % 100) + return old_send(self, *args, **kwargs) + + +def setup_function(): + requests.clear() + Connection.send = patched_send + + +def teardown_function(): + Connection.send = old_send + + +def wait_for_affinity_distribution(cache, key, node_idx, timeout=30): + real_node_idx = 0 + + def check_grid_idx(): + nonlocal real_node_idx + try: + cache.get(key) + real_node_idx = requests.pop() + except (OSError, IOError): + return False + return real_node_idx == node_idx + + res = wait_for_condition(check_grid_idx, timeout=timeout) + + if not res: + raise TimeoutError(f"failed to wait for affinity distribution, expected node_idx {node_idx}," + f"got {real_node_idx} instead") + + +@pytest.mark.parametrize("key,grid_idx", [(1, 1), (2, 2), (3, 3), (4, 1), (5, 1), (6, 2), (11, 1), (13, 1), (19, 1)]) @pytest.mark.parametrize("backups", [0, 1, 2, 3]) def test_cache_operation_on_primitive_key_routes_request_to_primary_node( request, key, grid_idx, backups, client_partition_aware): @@ -34,52 +72,51 @@ def test_cache_operation_on_primitive_key_routes_request_to_primary_node( PROP_BACKUPS_NUMBER: backups, }) - # Warm up affinity map cache.put(key, key) - get_request_grid_idx() + wait_for_affinity_distribution(cache, key, grid_idx) # Test cache.get(key) - assert get_request_grid_idx() == grid_idx + assert requests.pop() == grid_idx cache.put(key, key) - assert get_request_grid_idx("Put") == grid_idx + assert requests.pop() == grid_idx cache.replace(key, key + 1) - assert get_request_grid_idx("Replace") == grid_idx + assert requests.pop() == grid_idx cache.clear_key(key) - assert get_request_grid_idx("ClearKey") == grid_idx + assert requests.pop() == grid_idx cache.contains_key(key) - assert get_request_grid_idx("ContainsKey") == grid_idx + assert requests.pop() == grid_idx cache.get_and_put(key, 3) - assert get_request_grid_idx("GetAndPut") == grid_idx + assert requests.pop() == grid_idx cache.get_and_put_if_absent(key, 4) - assert get_request_grid_idx("GetAndPutIfAbsent") == grid_idx + assert requests.pop() == grid_idx cache.put_if_absent(key, 5) - assert get_request_grid_idx("PutIfAbsent") == grid_idx + assert requests.pop() == grid_idx cache.get_and_remove(key) - assert get_request_grid_idx("GetAndRemove") == grid_idx + assert requests.pop() == grid_idx cache.get_and_replace(key, 6) - assert get_request_grid_idx("GetAndReplace") == grid_idx + assert requests.pop() == grid_idx cache.remove_key(key) - assert get_request_grid_idx("RemoveKey") == grid_idx + assert requests.pop() == grid_idx cache.remove_if_equals(key, -1) - assert get_request_grid_idx("RemoveIfEquals") == grid_idx + assert requests.pop() == grid_idx cache.replace(key, -1) - assert get_request_grid_idx("Replace") == grid_idx + assert requests.pop() == grid_idx cache.replace_if_equals(key, 10, -10) - assert get_request_grid_idx("ReplaceIfEquals") == grid_idx + assert requests.pop() == grid_idx @pytest.mark.skip(reason="Custom key objects are not supported yet") @@ -121,31 +158,28 @@ class AffinityTestType1( cache.put(key_obj, 1) cache.put(key_obj, 2) - assert get_request_grid_idx("Put") == grid_idx + assert requests.pop() == grid_idx -@pytest.mark.skip("https://issues.apache.org/jira/browse/IGNITE-13967") def test_cache_operation_routed_to_new_cluster_node(request, start_ignite_server, start_client): client = start_client(partition_aware=True) client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802), ("127.0.0.1", 10803), ("127.0.0.1", 10804)]) cache = client.get_or_create_cache(request.node.name) key = 12 + wait_for_affinity_distribution(cache, key, 3) cache.put(key, key) cache.put(key, key) - assert get_request_grid_idx("Put") == 3 + assert requests.pop() == 3 srv = start_ignite_server(4) try: # Wait for rebalance and partition map exchange - def check_grid_idx(): - cache.get(key) - return get_request_grid_idx() == 4 - wait_for_condition(check_grid_idx) + wait_for_affinity_distribution(cache, key, 4) # Response is correct and comes from the new node res = cache.get_and_remove(key) assert res == key - assert get_request_grid_idx("GetAndRemove") == 4 + assert requests.pop() == 4 finally: kill_process_tree(srv.pid) @@ -167,13 +201,13 @@ def verify_random_node(cache): key = 1 cache.put(key, key) - idx1 = get_request_grid_idx("Put") + idx1 = requests.pop() idx2 = idx1 # Try 10 times - random node may end up being the same for _ in range(1, 10): cache.put(key, key) - idx2 = get_request_grid_idx("Put") + idx2 = requests.pop() if idx2 != idx1: break assert idx1 != idx2 diff --git a/tests/test_affinity_single_connection.py b/tests/test_affinity_single_connection.py index c40393c..1943384 100644 --- a/tests/test_affinity_single_connection.py +++ b/tests/test_affinity_single_connection.py @@ -13,10 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pytest - -from tests.util import get_request_grid_idx - def test_all_cache_operations_with_partition_aware_client_on_single_server(request, client_partition_aware_single_server): cache = client_partition_aware_single_server.get_or_create_cache(request.node.name) diff --git a/tests/test_cache_composite_key_class_sql.py b/tests/test_cache_composite_key_class_sql.py index 2f1705f..989a229 100644 --- a/tests/test_cache_composite_key_class_sql.py +++ b/tests/test_cache_composite_key_class_sql.py @@ -111,13 +111,12 @@ def test_python_sql_finds_inserted_value_with_composite_key(client): def validate_query_result(student_key, student_val, query_result): - ''' + """ Compare query result with expected key and value. - ''' + """ assert len(query_result) == 2 sql_row = dict(zip(query_result[0], query_result[1])) - assert sql_row["_KEY"][0] == student_key._buffer assert sql_row['ID'] == student_key.ID assert sql_row['DEPT'] == student_key.DEPT assert sql_row['NAME'] == student_val.NAME diff --git a/tests/test_sql.py b/tests/test_sql.py index 15f84ee..c896afb 100644 --- a/tests/test_sql.py +++ b/tests/test_sql.py @@ -22,7 +22,9 @@ ) from pyignite.datatypes.prop_codes import * from pyignite.exceptions import SQLError -from pyignite.utils import entity_id, unwrap_binary +from pyignite.utils import entity_id +from pyignite.binary import unwrap_binary + initial_data = [ ('John', 'Doe', 5), diff --git a/tests/util.py b/tests/util.py index 1d6acd6..90f0146 100644 --- a/tests/util.py +++ b/tests/util.py @@ -15,6 +15,8 @@ import glob import os + +import jinja2 as jinja2 import psutil import re import signal @@ -72,22 +74,19 @@ def get_ignite_config_path(use_ssl=False): if use_ssl: file_name = "ignite-config-ssl.xml" else: - file_name = "ignite-config.xml" + file_name = "ignite-config.xml.jinja2" return os.path.join(get_test_dir(), "config", file_name) def check_server_started(idx=1): - log_file = os.path.join(get_test_dir(), "logs", f"ignite-log-{idx}.txt") - if not os.path.exists(log_file): - return False - pattern = re.compile('^Topology snapshot.*') - with open(log_file) as f: - for line in f.readlines(): - if pattern.match(line): - return True + for log_file in get_log_files(idx): + with open(log_file) as f: + for line in f.readlines(): + if pattern.match(line): + return True return False @@ -102,20 +101,33 @@ def kill_process_tree(pid): os.kill(pid, signal.SIGKILL) +templateLoader = jinja2.FileSystemLoader(searchpath=os.path.join(get_test_dir(), "config")) +templateEnv = jinja2.Environment(loader=templateLoader) + + +def create_config_file(tpl_name, file_name, **kwargs): + template = templateEnv.get_template(tpl_name) + with open(os.path.join(get_test_dir(), "config", file_name), mode='w') as f: + f.write(template.render(**kwargs)) + + def _start_ignite(idx=1, debug=False, use_ssl=False): clear_logs(idx) runner = get_ignite_runner() env = os.environ.copy() - env['IGNITE_INSTANCE_INDEX'] = str(idx) - env['IGNITE_CLIENT_PORT'] = str(10800 + idx) if debug: env["JVM_OPTS"] = "-Djava.net.preferIPv4Stack=true -Xdebug -Xnoagent -Djava.compiler=NONE " \ "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 " - ignite_cmd = [runner, get_ignite_config_path(use_ssl)] + params = {'ignite_instance_idx': str(idx), 'ignite_client_port': 10800 + idx, 'use_ssl': use_ssl} + + create_config_file('log4j.xml.jinja2', f'log4j-{idx}.xml', **params) + create_config_file('ignite-config.xml.jinja2', f'ignite-config-{idx}.xml', **params) + + ignite_cmd = [runner, os.path.join(get_test_dir(), "config", f'ignite-config-{idx}.xml')] print("Starting Ignite server node:", ignite_cmd) srv = subprocess.Popen(ignite_cmd, env=env, cwd=get_test_dir()) @@ -142,38 +154,3 @@ def get_log_files(idx=1): def clear_logs(idx=1): for f in get_log_files(idx): os.remove(f) - - -def read_log_file(file, idx): - i = -1 - with open(file) as f: - lines = f.readlines() - for line in lines: - i += 1 - - if i < read_log_file.last_line[idx]: - continue - - if i > read_log_file.last_line[idx]: - read_log_file.last_line[idx] = i - - # Example: Client request received [reqId=1, addr=/127.0.0.1:51694, - # req=org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutRequest@1f33101e] - res = re.match("Client request received .*?req=org.apache.ignite.internal.processors." - "platform.client.cache.ClientCache([a-zA-Z]+)Request@", line) - - if res is not None: - yield res.group(1) - - -def get_request_grid_idx(message="Get"): - res = -1 - for i in range(1, 5): - for log_file in get_log_files(i): - for log in read_log_file(log_file, i): - if log == message: - res = i # Do not exit early to advance all log positions - return res - - -read_log_file.last_line = [0, 0, 0, 0, 0] \ No newline at end of file diff --git a/tox.ini b/tox.ini index 69db226..4361413 100644 --- a/tox.ini +++ b/tox.ini @@ -34,6 +34,10 @@ usedevelop = True commands = pytest {env:PYTESTARGS:} {posargs} +[jenkins] +setenv: + PYTESTARGS = --junitxml=junit-{envname}.xml + [no-ssl] setenv: PYTEST_ADDOPTS = --examples @@ -54,3 +58,18 @@ setenv: {[ssl]setenv} [testenv:py{36,37,38}-ssl-password] setenv: {[ssl-password]setenv} + +[testenv:py{36,37,38}-jenkins-no-ssl] +setenv: + {[no-ssl]setenv} + {[jenkins]setenv} + +[testenv:py{36,37,38}-jenkins-ssl] +setenv: + {[ssl]setenv} + {[jenkins]setenv} + +[testenv:py{36,37,38}-jenkins-ssl-password] +setenv: + {[ssl-password]setenv} + {[jenkins]setenv}