Skip to content

Commit b54973c

Browse files
committed
IGNITE-13967 Optimize allocations during parsing
1 parent cae9304 commit b54973c

20 files changed

+250
-212
lines changed

pyignite/api/affinity.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,11 @@
5555
partition_mapping = StructArray([
5656
('is_applicable', Bool),
5757

58-
('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] == b'\x01',
58+
('cache_mapping', Conditional(lambda stream, ctx: stream.mem_view(*ctx['is_applicable']) == b'\x01',
5959
lambda ctx: ctx['is_applicable'] is True,
6060
cache_mapping, empty_cache_mapping)),
6161

62-
('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] == b'\x01',
62+
('node_mapping', Conditional(lambda stream, ctx: stream.mem_view(*ctx['is_applicable']) == b'\x01',
6363
lambda ctx: ctx['is_applicable'] is True,
6464
node_mapping, empty_node_mapping)),
6565
])

pyignite/api/binary.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,31 +57,35 @@ def get_binary_type(conn: 'Connection', binary_type: Union[str, int], query_id=N
5757
following=[('type_exists', Bool)])
5858

5959
with BinaryStream(conn.recv(), conn) as stream:
60-
response_head_type, recv_buffer = response_head_struct.parse(stream)
61-
response_head = response_head_type.from_buffer_copy(recv_buffer)
60+
response_head_type, response_positions = response_head_struct.parse(stream)
61+
response_head = response_head_type.from_buffer_copy(stream.mem_view(*response_positions))
62+
init_pos, total_len = response_positions
63+
6264
response_parts = []
6365
if response_head.type_exists:
64-
resp_body_type, resp_body_buffer = body_struct.parse(stream)
66+
resp_body_type, resp_body_positions = body_struct.parse(stream)
6567
response_parts.append(('body', resp_body_type))
66-
resp_body = resp_body_type.from_buffer_copy(resp_body_buffer)
67-
recv_buffer += resp_body_buffer
68+
resp_body = resp_body_type.from_buffer_copy(stream.mem_view(*resp_body_positions))
69+
total_len += resp_body_positions[1]
6870
if resp_body.is_enum:
69-
resp_enum, resp_enum_buffer = enum_struct.parse(stream)
71+
resp_enum, resp_enum_positions = enum_struct.parse(stream)
7072
response_parts.append(('enums', resp_enum))
71-
recv_buffer += resp_enum_buffer
72-
resp_schema_type, resp_schema_buffer = schema_struct.parse(stream)
73+
total_len += resp_enum_positions[1]
74+
75+
resp_schema_type, resp_schema_positions = schema_struct.parse(stream)
7376
response_parts.append(('schema', resp_schema_type))
74-
recv_buffer += resp_schema_buffer
75-
76-
response_class = type(
77-
'GetBinaryTypeResponse',
78-
(response_head_type,),
79-
{
80-
'_pack_': 1,
81-
'_fields_': response_parts,
82-
}
83-
)
84-
response = response_class.from_buffer_copy(recv_buffer)
77+
total_len += resp_schema_positions[1]
78+
79+
response_class = type(
80+
'GetBinaryTypeResponse',
81+
(response_head_type,),
82+
{
83+
'_pack_': 1,
84+
'_fields_': response_parts,
85+
}
86+
)
87+
response = response_class.from_buffer_copy(stream.mem_view(init_pos, total_len))
88+
8589
result = APIResult(response)
8690
if result.status != 0:
8791
return result

pyignite/binary.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ def _from_python(self, stream):
169169
stream.seek(initial_pos + header.schema_offset)
170170
stream.write(schema)
171171

172+
self._buffer = bytes(stream.mem_view(initial_pos, stream.tell() - initial_pos))
172173
self._hashcode = header.hash_code
173174

174175
def _setattr(self, attr_name: str, attr_value: Any):

pyignite/connection/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,9 @@ def read_response(self) -> Union[dict, OrderedDict]:
205205
('length', Int),
206206
('op_code', Byte),
207207
])
208-
with BinaryStream(self.recv(), self) as response_stream:
209-
start_class, start_buffer = response_start.parse(response_stream)
210-
start = start_class.from_buffer_copy(start_buffer)
208+
with BinaryStream(self.recv(), self) as stream:
209+
start_class, start_positions = response_start.parse(stream)
210+
start = start_class.from_buffer_copy(stream.mem_view(*start_positions))
211211
data = response_start.to_python(start)
212212
response_end = None
213213
if data['op_code'] == 0:
@@ -222,8 +222,8 @@ def read_response(self) -> Union[dict, OrderedDict]:
222222
('node_uuid', UUIDObject),
223223
])
224224
if response_end:
225-
end_class, end_buffer = response_end.parse(response_stream)
226-
end = end_class.from_buffer_copy(end_buffer)
225+
end_class, end_positions = response_end.parse(stream)
226+
end = end_class.from_buffer_copy(stream.mem_view(*end_positions))
227227
data.update(response_end.to_python(end))
228228
return data
229229

pyignite/datatypes/cache_properties.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,12 @@ def build_header(cls):
9393

9494
@classmethod
9595
def parse(cls, stream):
96+
init_pos = stream.tell()
97+
9698
header_class = cls.build_header()
97-
buf = stream.read(ctypes.sizeof(header_class))
98-
data_class, data_buf = cls.prop_data_class.parse(stream)
99-
buf += data_buf
99+
header_len = ctypes.sizeof(header_class)
100+
data_class, data_buf = cls.prop_data_class.parse(stream.mem_view(init_pos, header_len))
101+
100102
prop_class = type(
101103
cls.__name__,
102104
(header_class,),
@@ -107,7 +109,9 @@ def parse(cls, stream):
107109
],
108110
}
109111
)
110-
return prop_class, buf
112+
113+
stream.seek(init_pos + ctypes.sizeof(prop_class))
114+
return prop_class, (init_pos, stream.tell() - init_pos)
111115

112116
@classmethod
113117
def to_python(cls, ctype_object, *args, **kwargs):

pyignite/datatypes/complex.py

Lines changed: 57 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
from collections import OrderedDict
1717
import ctypes
18-
import inspect
18+
from io import SEEK_CUR
1919
from typing import Iterable, Dict
2020

2121
from pyignite.constants import *
@@ -71,19 +71,20 @@ def build_header(cls):
7171

7272
@classmethod
7373
def parse(cls, stream):
74-
buffer = stream.read(ctypes.sizeof(ctypes.c_byte))
74+
init_pos, type_len = stream.tell(), ctypes.sizeof(ctypes.c_byte)
7575

76+
buffer = stream.read(type_len)
7677
if buffer == TC_NULL:
77-
return Null.build_c_type(), buffer
78+
return Null.build_c_type(), (init_pos, type_len)
7879

7980
header_class = cls.build_header()
80-
buffer += stream.read(ctypes.sizeof(header_class) - len(buffer))
81-
header = header_class.from_buffer_copy(buffer)
82-
fields = []
81+
header_len = ctypes.sizeof(header_class)
82+
header = header_class.from_buffer_copy(stream.mem_view(init_pos, header_len))
83+
stream.seek(init_pos + header_len)
8384

85+
fields = []
8486
for i in range(header.length):
85-
c_type, buffer_fragment = AnyDataObject.parse(stream)
86-
buffer += buffer_fragment
87+
c_type, _ = AnyDataObject.parse(stream)
8788
fields.append(('element_{}'.format(i), c_type))
8889

8990
final_class = type(
@@ -94,7 +95,9 @@ def parse(cls, stream):
9495
'_fields_': fields,
9596
}
9697
)
97-
return final_class, buffer
98+
99+
stream.seek(init_pos + ctypes.sizeof(final_class))
100+
return final_class, (init_pos, stream.tell() - init_pos)
98101

99102
@classmethod
100103
def to_python(cls, ctype_object, *args, **kwargs):
@@ -164,14 +167,16 @@ def build_header(cls):
164167

165168
@classmethod
166169
def parse(cls, stream):
167-
buffer = stream.read(ctypes.sizeof(ctypes.c_byte))
170+
init_pos, type_len = stream.tell(), ctypes.sizeof(ctypes.c_byte)
168171

172+
buffer = stream.read(type_len)
169173
if buffer == TC_NULL:
170-
return Null.build_c_type(), buffer
174+
return Null.build_c_type(), (init_pos, type_len)
171175

172176
header_class = cls.build_header()
173-
buffer += stream.read(ctypes.sizeof(header_class) - len(buffer))
174-
header = header_class.from_buffer_copy(buffer)
177+
header_len = ctypes.sizeof(header_class)
178+
header = header_class.from_buffer_copy(stream.mem_view(init_pos, header_len))
179+
stream.seek(init_pos + header_len)
175180

176181
final_class = type(
177182
cls.__name__,
@@ -184,10 +189,9 @@ def parse(cls, stream):
184189
],
185190
}
186191
)
187-
buffer += stream.read(
188-
ctypes.sizeof(final_class) - ctypes.sizeof(header_class)
189-
)
190-
return final_class, buffer
192+
193+
stream.seek(init_pos + ctypes.sizeof(final_class))
194+
return final_class, (init_pos, stream.tell() - init_pos)
191195

192196
@classmethod
193197
def to_python(cls, ctype_object, *args, **kwargs):
@@ -262,19 +266,21 @@ def build_header(cls):
262266

263267
@classmethod
264268
def parse(cls, stream):
265-
buffer = stream.read(ctypes.sizeof(ctypes.c_byte))
269+
init_pos, type_len = stream.tell(), ctypes.sizeof(ctypes.c_byte)
266270

267-
if buffer == TC_NULL:
268-
return Null.build_c_type(), buffer
271+
type_ = stream.mem_view(init_pos, type_len)
272+
if type_ == TC_NULL:
273+
stream.seek(type_len, SEEK_CUR)
274+
return Null.build_c_type(), (init_pos, type_len)
269275

270276
header_class = cls.build_header()
271-
buffer += stream.read(ctypes.sizeof(header_class) - len(buffer))
272-
header = header_class.from_buffer_copy(buffer)
273-
fields = []
277+
header_len = ctypes.sizeof(header_class)
278+
header = header_class.from_buffer_copy(stream.mem_view(init_pos, header_len))
279+
stream.seek(init_pos + header_len)
274280

281+
fields = []
275282
for i in range(header.length):
276-
c_type, buffer_fragment = AnyDataObject.parse(stream)
277-
buffer += buffer_fragment
283+
c_type, _ = AnyDataObject.parse(stream)
278284
fields.append(('element_{}'.format(i), c_type))
279285

280286
final_class = type(
@@ -285,7 +291,7 @@ def parse(cls, stream):
285291
'_fields_': fields,
286292
}
287293
)
288-
return final_class, buffer
294+
return final_class, (init_pos, stream.tell() - init_pos)
289295

290296
@classmethod
291297
def to_python(cls, ctype_object, *args, **kwargs):
@@ -360,19 +366,20 @@ def build_header(cls):
360366

361367
@classmethod
362368
def parse(cls, stream):
363-
buffer = stream.read(ctypes.sizeof(ctypes.c_byte))
369+
init_pos, type_len = stream.tell(), ctypes.sizeof(ctypes.c_byte)
364370

371+
buffer = stream.read(type_len)
365372
if buffer == TC_NULL:
366-
return Null.build_c_type(), buffer
373+
return Null.build_c_type(), (init_pos, type_len)
367374

368375
header_class = cls.build_header()
369-
buffer += stream.read(ctypes.sizeof(header_class) - len(buffer))
370-
header = header_class.from_buffer_copy(buffer)
371-
fields = []
376+
header_len = ctypes.sizeof(header_class)
377+
header = header_class.from_buffer_copy(stream.mem_view(init_pos, header_len))
378+
stream.seek(init_pos + header_len)
372379

380+
fields = []
373381
for i in range(header.length << 1):
374-
c_type, buffer_fragment = AnyDataObject.parse(stream)
375-
buffer += buffer_fragment
382+
c_type, _ = AnyDataObject.parse(stream)
376383
fields.append(('element_{}'.format(i), c_type))
377384

378385
final_class = type(
@@ -383,7 +390,7 @@ def parse(cls, stream):
383390
'_fields_': fields,
384391
}
385392
)
386-
return final_class, buffer
393+
return final_class, (init_pos, stream.tell() - init_pos)
387394

388395
@classmethod
389396
def to_python(cls, ctype_object, *args, **kwargs):
@@ -541,27 +548,30 @@ def schema_type(cls, flags: int):
541548
@classmethod
542549
def parse(cls, stream):
543550
from pyignite.datatypes import Struct
544-
buffer = stream.read(ctypes.sizeof(ctypes.c_byte))
545551

546-
if buffer == TC_NULL:
547-
return Null.build_c_type(), buffer
552+
init_pos, type_len = stream.tell(), ctypes.sizeof(ctypes.c_byte)
553+
554+
type_ = stream.mem_view(init_pos, type_len)
555+
if type_ == TC_NULL:
556+
stream.seek(type_len, SEEK_CUR)
557+
return Null.build_c_type(), (init_pos, type_len)
548558

549559
header_class = cls.build_header()
550-
buffer += stream.read(ctypes.sizeof(header_class) - len(buffer))
551-
header = header_class.from_buffer_copy(buffer)
560+
header_len = ctypes.sizeof(header_class)
561+
header = header_class.from_buffer_copy(stream.mem_view(init_pos, header_len))
562+
stream.seek(init_pos + header_len)
552563

553564
# ignore full schema, always retrieve fields' types and order
554565
# from complex types registry
555566
data_class = stream.get_dataclass(header)
556567
fields = data_class.schema.items()
557568
object_fields_struct = Struct(fields)
558-
object_fields, object_fields_buffer = object_fields_struct.parse(stream)
559-
buffer += object_fields_buffer
569+
object_fields, _ = object_fields_struct.parse(stream)
560570
final_class_fields = [('object_fields', object_fields)]
561571

562572
if header.flags & cls.HAS_SCHEMA:
563573
schema = cls.schema_type(header.flags) * len(fields)
564-
buffer += stream.read(ctypes.sizeof(schema))
574+
stream.seek(ctypes.sizeof(schema), SEEK_CUR)
565575
final_class_fields.append(('schema', schema))
566576

567577
final_class = type(
@@ -574,7 +584,7 @@ def parse(cls, stream):
574584
)
575585
# register schema encoding approach
576586
stream.compact_footer = bool(header.flags & cls.COMPACT_FOOTER)
577-
return final_class, buffer
587+
return final_class, (init_pos, stream.tell() - init_pos)
578588

579589
@classmethod
580590
def to_python(cls, ctype_object, client: 'Client' = None, *args, **kwargs):
@@ -610,4 +620,7 @@ def from_python(cls, stream, value):
610620
return
611621

612622
stream.register_binary_type(value.__class__)
613-
value._from_python(stream)
623+
if getattr(value, '_buffer', None):
624+
stream.write(value._buffer)
625+
else:
626+
value._from_python(stream)

0 commit comments

Comments
 (0)