Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
.eggs
.pytest_cache
.tox
tests/config/*.xml
junit*.xml
pyignite.egg-info
ignite-log-*
__pycache__
__pycache__
8 changes: 4 additions & 4 deletions pyignite/api/affinity.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@
partition_mapping = StructArray([
('is_applicable', Bool),

('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] == b'\x01',
lambda ctx: ctx['is_applicable'] is True,
('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
lambda ctx: ctx['is_applicable'],
cache_mapping, empty_cache_mapping)),

('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] == b'\x01',
lambda ctx: ctx['is_applicable'] is True,
('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
lambda ctx: ctx['is_applicable'],
node_mapping, empty_node_mapping)),
])

Expand Down
70 changes: 36 additions & 34 deletions pyignite/api/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@
from pyignite.queries.op_codes import *
from pyignite.utils import int_overflow, entity_id
from .result import APIResult
from ..stream import BinaryStream, READ_BACKWARD
from ..queries.response import Response


def get_binary_type(
connection: 'Connection', binary_type: Union[str, int], query_id=None,
) -> APIResult:
def get_binary_type(conn: 'Connection', binary_type: Union[str, int], query_id=None) -> APIResult:
"""
Gets the binary type information by type ID.

:param connection: connection to Ignite server,
:param conn: connection to Ignite server,
:param binary_type: binary type name or ID,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
Expand All @@ -49,39 +48,42 @@ def get_binary_type(
query_id=query_id,
)

_, send_buffer = query_struct.from_python({
'type_id': entity_id(binary_type),
})
connection.send(send_buffer)
with BinaryStream(conn) as stream:
query_struct.from_python(stream, {
'type_id': entity_id(binary_type),
})
conn.send(stream.getbuffer())

response_head_struct = Response(protocol_version=connection.get_protocol_version(),
response_head_struct = Response(protocol_version=conn.get_protocol_version(),
following=[('type_exists', Bool)])

response_head_type, recv_buffer = response_head_struct.parse(connection)
response_head = response_head_type.from_buffer_copy(recv_buffer)
response_parts = []
if response_head.type_exists:
resp_body_type, resp_body_buffer = body_struct.parse(connection)
response_parts.append(('body', resp_body_type))
resp_body = resp_body_type.from_buffer_copy(resp_body_buffer)
recv_buffer += resp_body_buffer
if resp_body.is_enum:
resp_enum, resp_enum_buffer = enum_struct.parse(connection)
response_parts.append(('enums', resp_enum))
recv_buffer += resp_enum_buffer
resp_schema_type, resp_schema_buffer = schema_struct.parse(connection)
response_parts.append(('schema', resp_schema_type))
recv_buffer += resp_schema_buffer

response_class = type(
'GetBinaryTypeResponse',
(response_head_type,),
{
'_pack_': 1,
'_fields_': response_parts,
}
)
response = response_class.from_buffer_copy(recv_buffer)
with BinaryStream(conn, conn.recv()) as stream:
init_pos = stream.tell()
response_head_type = response_head_struct.parse(stream)
response_head = stream.read_ctype(response_head_type, direction=READ_BACKWARD)

response_parts = []
if response_head.type_exists:
resp_body_type = body_struct.parse(stream)
response_parts.append(('body', resp_body_type))
resp_body = stream.read_ctype(resp_body_type, direction=READ_BACKWARD)
if resp_body.is_enum:
resp_enum = enum_struct.parse(stream)
response_parts.append(('enums', resp_enum))

resp_schema_type = schema_struct.parse(stream)
response_parts.append(('schema', resp_schema_type))

response_class = type(
'GetBinaryTypeResponse',
(response_head_type,),
{
'_pack_': 1,
'_fields_': response_parts,
}
)
response = stream.read_ctype(response_class, position=init_pos)

result = APIResult(response)
if result.status != 0:
return result
Expand Down
47 changes: 25 additions & 22 deletions pyignite/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,17 @@ def __new__(
mcs, name, (GenericObjectProps, )+base_classes, namespace
)

def _build(self, client: 'Client' = None) -> int:
def _from_python(self, stream, save_to_buf=False):
"""
Method for building binary representation of the Generic object
and calculating a hashcode from it.

:param self: Generic object instance,
:param client: (optional) connection to Ignite cluster,
:param stream: BinaryStream
:param save_to_buf: Optional. If True, save serialized data to buffer.
"""
if client is None:
compact_footer = True
else:
compact_footer = client.compact_footer

compact_footer = stream.compact_footer

# prepare header
header_class = BinaryObject.build_header()
Expand All @@ -129,18 +128,19 @@ def _build(self, client: 'Client' = None) -> int:
header.type_id = self.type_id
header.schema_id = self.schema_id

header_len = ctypes.sizeof(header_class)
initial_pos = stream.tell()

# create fields and calculate offsets
offsets = [ctypes.sizeof(header_class)]
field_buffer = bytearray()
schema_items = list(self.schema.items())

stream.seek(initial_pos + header_len)
for field_name, field_type in schema_items:
partial_buffer = field_type.from_python(
getattr(
self, field_name, getattr(field_type, 'default', None)
)
)
offsets.append(max(offsets) + len(partial_buffer))
field_buffer += partial_buffer
val = getattr(self, field_name, getattr(field_type, 'default', None))
field_start_pos = stream.tell()
field_type.from_python(stream, val)
offsets.append(max(offsets) + stream.tell() - field_start_pos)

offsets = offsets[:-1]

Expand All @@ -160,15 +160,18 @@ def _build(self, client: 'Client' = None) -> int:
schema[i].offset = offset

# calculate size and hash code
header.schema_offset = (
ctypes.sizeof(header_class)
+ len(field_buffer)
)
fields_data_len = stream.tell() - initial_pos - header_len
header.schema_offset = fields_data_len + header_len
header.length = header.schema_offset + ctypes.sizeof(schema_class)
header.hash_code = hashcode(field_buffer)
header.hash_code = stream.hashcode(initial_pos + header_len, fields_data_len)

stream.seek(initial_pos)
stream.write(header)
stream.seek(initial_pos + header.schema_offset)
stream.write(schema)

# reuse the results
self._buffer = bytes(header) + field_buffer + bytes(schema)
if save_to_buf:
self._buffer = bytes(stream.mem_view(initial_pos, stream.tell() - initial_pos))
self._hashcode = header.hash_code

def _setattr(self, attr_name: str, attr_value: Any):
Expand All @@ -180,7 +183,7 @@ def _setattr(self, attr_name: str, attr_value: Any):
# `super()` is really need these parameters
super(result, self).__setattr__(attr_name, attr_value)

setattr(result, _build.__name__, _build)
setattr(result, _from_python.__name__, _from_python)
setattr(result, '__setattr__', _setattr)
setattr(result, '_buffer', None)
setattr(result, '_hashcode', None)
Expand Down
4 changes: 2 additions & 2 deletions pyignite/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing import Any, Dict, Iterable, Optional, Tuple, Union

from .constants import *
from .binary import GenericObjectMeta
from .binary import GenericObjectMeta, unwrap_binary
from .datatypes import prop_codes
from .datatypes.internal import AnyDataObject
from .exceptions import (
Expand All @@ -26,7 +26,7 @@
)
from .utils import (
cache_id, get_field_by_id, is_wrapped,
status_to_exception, unsigned, unwrap_binary,
status_to_exception, unsigned
)
from .api.cache_config import (
cache_create, cache_create_with_config,
Expand Down
Loading