From 3467ef719284ae72e815269ca0ff6fa897e9cab9 Mon Sep 17 00:00:00 2001 From: Ivan Dashchinskiy Date: Fri, 12 Feb 2021 12:24:20 +0300 Subject: [PATCH 1/2] IGNITE-14167 Simplify reconnecting, fix affinity topology change detection. --- .travis.yml | 24 +- pyignite/cache.py | 4 + pyignite/client.py | 8 +- pyignite/connection/__init__.py | 410 +------------------------ pyignite/connection/connection.py | 381 +++++++++++++++++++++++ pyignite/datatypes/complex.py | 2 +- pyignite/queries/query.py | 8 +- pyignite/stream/binary_stream.py | 5 +- pyignite/utils.py | 25 -- tests/config/log4j.xml.jinja2 | 1 - tests/conftest.py | 4 +- tests/test_affinity_request_routing.py | 8 +- tox.ini | 6 - 13 files changed, 423 insertions(+), 463 deletions(-) create mode 100644 pyignite/connection/connection.py diff --git a/.travis.yml b/.travis.yml index f884bdb..3095941 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +language: python sudo: required addons: @@ -21,7 +22,9 @@ addons: - openjdk-8-jdk env: - - IGNITE_VERSION=2.9.1 IGNITE_HOME=/opt/ignite + global: + - IGNITE_VERSION=2.9.1 + - IGNITE_HOME=/opt/ignite before_install: - curl -L https://apache-mirror.rbc.ru/pub/apache/ignite/${IGNITE_VERSION}/apache-ignite-slim-${IGNITE_VERSION}-bin.zip > ignite.zip @@ -29,10 +32,17 @@ before_install: - mv /opt/apache-ignite-slim-${IGNITE_VERSION}-bin /opt/ignite - mv /opt/ignite/libs/optional/ignite-log4j2 /opt/ignite/libs/ -language: python -python: - - "3.6" - - "3.7" - - "3.8" -install: pip install tox-travis +jobs: + include: + - python: '3.6' + arch: amd64 + env: TOXENV=py36-no-ssl,py36-ssl,py36-ssl-password + - python: '3.7' + arch: amd64 + env: TOXENV=py37-no-ssl,py37-ssl,py37-ssl-password + - python: '3.8' + arch: amd64 + env: TOXENV=py38-no-ssl,py38-ssl,py38-ssl-password + +install: pip install tox script: tox \ No newline at end of file diff --git a/pyignite/cache.py b/pyignite/cache.py index dd7dac4..ea672a8 100644 --- a/pyignite/cache.py +++ b/pyignite/cache.py @@ -283,6 +283,10 @@ def get_best_node( parts += len(p) self.affinity['number_of_partitions'] = parts + + for conn in self.client._nodes: + if not conn.alive: + conn.reconnect() else: # get number of partitions parts = self.affinity.get('number_of_partitions') diff --git a/pyignite/client.py b/pyignite/client.py index 83cb196..77c6373 100644 --- a/pyignite/client.py +++ b/pyignite/client.py @@ -182,15 +182,9 @@ def connect(self, *args): if not self.partition_aware: # do not try to open more nodes self._current_node = i - else: - # take a chance to schedule the reconnection - # for all the failed connections, that was probed - # before this - for failed_node in self._nodes[:i]: - failed_node.reconnect() except connection_errors: - conn._fail() + conn.failed = True if self.partition_aware: # schedule the reconnection conn.reconnect() diff --git a/pyignite/connection/__init__.py b/pyignite/connection/__init__.py index 0e793f8..1114594 100644 --- a/pyignite/connection/__init__.py +++ b/pyignite/connection/__init__.py @@ -33,414 +33,6 @@ as well as Ignite protocol handshaking. """ -from collections import OrderedDict -import socket -from threading import RLock -from typing import Union - -from pyignite.constants import * -from pyignite.exceptions import ( - HandshakeError, ParameterError, SocketError, connection_errors, -) -from pyignite.datatypes import Byte, Int, Short, String, UUIDObject -from pyignite.datatypes.internal import Struct -from pyignite.utils import DaemonicTimer - -from .handshake import HandshakeRequest -from .ssl import wrap - +from .connection import Connection __all__ = ['Connection'] - -from ..stream import BinaryStream, READ_BACKWARD - - -class Connection: - """ - This is a `pyignite` class, that represents a connection to Ignite - node. It serves multiple purposes: - - * socket wrapper. Detects fragmentation and network errors. See also - https://docs.python.org/3/howto/sockets.html, - * binary protocol connector. Incapsulates handshake and failover reconnection. - """ - - _socket = None - _failed = None - _in_use = None - - client = None - host = None - port = None - timeout = None - username = None - password = None - ssl_params = {} - uuid = None - - @staticmethod - def _check_ssl_params(params): - expected_args = [ - 'use_ssl', - 'ssl_version', - 'ssl_ciphers', - 'ssl_cert_reqs', - 'ssl_keyfile', - 'ssl_keyfile_password', - 'ssl_certfile', - 'ssl_ca_certfile', - ] - for param in params: - if param not in expected_args: - raise ParameterError(( - 'Unexpected parameter for connection initialization: `{}`' - ).format(param)) - - def __init__( - self, client: 'Client', timeout: float = 2.0, - username: str = None, password: str = None, **ssl_params - ): - """ - Initialize connection. - - For the use of the SSL-related parameters see - https://docs.python.org/3/library/ssl.html#ssl-certificates. - - :param client: Ignite client object, - :param timeout: (optional) sets timeout (in seconds) for each socket - operation including `connect`. 0 means non-blocking mode, which is - virtually guaranteed to fail. Can accept integer or float value. - Default is None (blocking mode), - :param use_ssl: (optional) set to True if Ignite server uses SSL - on its binary connector. Defaults to use SSL when username - and password has been supplied, not to use SSL otherwise, - :param ssl_version: (optional) SSL version constant from standard - `ssl` module. Defaults to TLS v1.1, as in Ignite 2.5, - :param ssl_ciphers: (optional) ciphers to use. If not provided, - `ssl` default ciphers are used, - :param ssl_cert_reqs: (optional) determines how the remote side - certificate is treated: - - * `ssl.CERT_NONE` − remote certificate is ignored (default), - * `ssl.CERT_OPTIONAL` − remote certificate will be validated, - if provided, - * `ssl.CERT_REQUIRED` − valid remote certificate is required, - - :param ssl_keyfile: (optional) a path to SSL key file to identify - local (client) party, - :param ssl_keyfile_password: (optional) password for SSL key file, - can be provided when key file is encrypted to prevent OpenSSL - password prompt, - :param ssl_certfile: (optional) a path to ssl certificate file - to identify local (client) party, - :param ssl_ca_certfile: (optional) a path to a trusted certificate - or a certificate chain. Required to check the validity of the remote - (server-side) certificate, - :param username: (optional) user name to authenticate to Ignite - cluster, - :param password: (optional) password to authenticate to Ignite cluster. - """ - self.client = client - self.timeout = timeout - self.username = username - self.password = password - self._check_ssl_params(ssl_params) - if self.username and self.password and 'use_ssl' not in ssl_params: - ssl_params['use_ssl'] = True - self.ssl_params = ssl_params - self._failed = False - self._mux = RLock() - self._in_use = False - - @property - def socket(self) -> socket.socket: - """ Network socket. """ - return self._socket - - @property - def closed(self) -> bool: - """ Tells if socket is closed. """ - with self._mux: - return self._socket is None - - @property - def failed(self) -> bool: - """ Tells if connection is failed. """ - with self._mux: - return self._failed - - @property - def alive(self) -> bool: - """ Tells if connection is up and no failure detected. """ - with self._mux: - return not (self._failed or self.closed) - - def __repr__(self) -> str: - return '{}:{}'.format(self.host or '?', self.port or '?') - - _wrap = wrap - - def get_protocol_version(self): - """ - Returns the tuple of major, minor, and revision numbers of the used - thin protocol version, or None, if no connection to the Ignite cluster - was yet established. - """ - return self.client.protocol_version - - def _fail(self): - """ set client to failed state. """ - with self._mux: - self._failed = True - - self._in_use = False - - def read_response(self) -> Union[dict, OrderedDict]: - """ - Processes server's response to the handshake request. - - :return: handshake data. - """ - response_start = Struct([ - ('length', Int), - ('op_code', Byte), - ]) - with BinaryStream(self, self.recv()) as stream: - start_class = response_start.parse(stream) - start = stream.read_ctype(start_class, direction=READ_BACKWARD) - data = response_start.to_python(start) - response_end = None - if data['op_code'] == 0: - response_end = Struct([ - ('version_major', Short), - ('version_minor', Short), - ('version_patch', Short), - ('message', String), - ]) - elif self.get_protocol_version() >= (1, 4, 0): - response_end = Struct([ - ('node_uuid', UUIDObject), - ]) - if response_end: - end_class = response_end.parse(stream) - end = stream.read_ctype(end_class, direction=READ_BACKWARD) - data.update(response_end.to_python(end)) - return data - - def connect( - self, host: str = None, port: int = None - ) -> Union[dict, OrderedDict]: - """ - Connect to the given server node with protocol version fallback. - - :param host: Ignite server node's host name or IP, - :param port: Ignite server node's port number. - """ - detecting_protocol = False - - with self._mux: - if self._in_use: - raise ConnectionError('Connection is in use.') - self._in_use = True - - # choose highest version first - if self.client.protocol_version is None: - detecting_protocol = True - self.client.protocol_version = max(PROTOCOLS) - - try: - result = self._connect_version(host, port) - except HandshakeError as e: - if e.expected_version in PROTOCOLS: - self.client.protocol_version = e.expected_version - result = self._connect_version(host, port) - else: - raise e - except connection_errors: - # restore undefined protocol version - if detecting_protocol: - self.client.protocol_version = None - raise - - # connection is ready for end user - self.uuid = result.get('node_uuid', None) # version-specific (1.4+) - - self._failed = False - return result - - def _connect_version( - self, host: str = None, port: int = None, - ) -> Union[dict, OrderedDict]: - """ - Connect to the given server node using protocol version - defined on client. - - :param host: Ignite server node's host name or IP, - :param port: Ignite server node's port number. - """ - - host = host or IGNITE_DEFAULT_HOST - port = port or IGNITE_DEFAULT_PORT - - self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._socket.settimeout(self.timeout) - self._socket = self._wrap(self.socket) - self._socket.connect((host, port)) - - protocol_version = self.client.protocol_version - - hs_request = HandshakeRequest( - protocol_version, - self.username, - self.password - ) - - with BinaryStream(self) as stream: - hs_request.from_python(stream) - self.send(stream.getbuffer()) - - hs_response = self.read_response() - if hs_response['op_code'] == 0: - # disconnect but keep in use - self.close(release=False) - - error_text = 'Handshake error: {}'.format(hs_response['message']) - # if handshake fails for any reason other than protocol mismatch - # (i.e. authentication error), server version is 0.0.0 - if any([ - hs_response['version_major'], - hs_response['version_minor'], - hs_response['version_patch'], - ]): - error_text += ( - ' Server expects binary protocol version ' - '{version_major}.{version_minor}.{version_patch}. Client ' - 'provides {client_major}.{client_minor}.{client_patch}.' - ).format( - client_major=protocol_version[0], - client_minor=protocol_version[1], - client_patch=protocol_version[2], - **hs_response - ) - raise HandshakeError(( - hs_response['version_major'], - hs_response['version_minor'], - hs_response['version_patch'], - ), error_text) - self.host, self.port = host, port - return hs_response - - def reconnect(self, seq_no=0): - """ - Tries to reconnect synchronously, then in background. - """ - - # stop trying to reconnect - if seq_no >= len(RECONNECT_BACKOFF_SEQUENCE): - self._failed = False - - self._reconnect() - - if self.failed: - DaemonicTimer( - RECONNECT_BACKOFF_SEQUENCE[seq_no], - self.reconnect, - kwargs={'seq_no': seq_no + 1}, - ).start() - - def _reconnect(self): - # do not reconnect if connection is already working - # or was closed on purpose - if not self.failed: - return - - self.close() - - # connect and silence the connection errors - try: - self.connect(self.host, self.port) - except connection_errors: - pass - - def _transfer_params(self, to: 'Connection'): - """ - Transfer non-SSL parameters to target connection object. - - :param to: connection object to transfer parameters to. - """ - to.username = self.username - to.password = self.password - to.client = self.client - to.host = self.host - to.port = self.port - - def send(self, data: Union[bytes, bytearray, memoryview], flags=None): - """ - Send data down the socket. - - :param data: bytes to send, - :param flags: (optional) OS-specific flags. - """ - if self.closed: - raise SocketError('Attempt to use closed connection.') - - kwargs = {} - if flags is not None: - kwargs['flags'] = flags - - try: - self.socket.sendall(data, **kwargs) - except Exception: - self._fail() - self.reconnect() - raise - - def recv(self, flags=None) -> bytearray: - def _recv(buffer, num_bytes): - bytes_to_receive = num_bytes - while bytes_to_receive > 0: - try: - bytes_rcvd = self.socket.recv_into(buffer, bytes_to_receive, **kwargs) - if bytes_rcvd == 0: - raise SocketError('Connection broken.') - except connection_errors: - self._fail() - self.reconnect() - raise - - buffer = buffer[bytes_rcvd:] - bytes_to_receive -= bytes_rcvd - - if self.closed: - raise SocketError('Attempt to use closed connection.') - - kwargs = {} - if flags is not None: - kwargs['flags'] = flags - - data = bytearray(4) - _recv(memoryview(data), 4) - response_len = int.from_bytes(data, PROTOCOL_BYTE_ORDER) - - data.extend(bytearray(response_len)) - _recv(memoryview(data)[4:], response_len) - return data - - - def close(self, release=True): - """ - Try to mark socket closed, then unlink it. This is recommended but - not required, since sockets are automatically closed when - garbage-collected. - """ - with self._mux: - if self._socket: - try: - self._socket.shutdown(socket.SHUT_RDWR) - self._socket.close() - except connection_errors: - pass - self._socket = None - - if release: - self._in_use = False diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py new file mode 100644 index 0000000..6ab6c6a --- /dev/null +++ b/pyignite/connection/connection.py @@ -0,0 +1,381 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import OrderedDict +import socket +from typing import Union + +from pyignite.constants import * +from pyignite.exceptions import ( + HandshakeError, ParameterError, SocketError, connection_errors, +) +from pyignite.datatypes import Byte, Int, Short, String, UUIDObject +from pyignite.datatypes.internal import Struct + +from .handshake import HandshakeRequest +from .ssl import wrap +from ..stream import BinaryStream, READ_BACKWARD + + +class Connection: + """ + This is a `pyignite` class, that represents a connection to Ignite + node. It serves multiple purposes: + + * socket wrapper. Detects fragmentation and network errors. See also + https://docs.python.org/3/howto/sockets.html, + * binary protocol connector. Incapsulates handshake and failover reconnection. + """ + + _socket = None + _failed = None + + client = None + host = None + port = None + timeout = None + username = None + password = None + ssl_params = {} + uuid = None + + @staticmethod + def _check_ssl_params(params): + expected_args = [ + 'use_ssl', + 'ssl_version', + 'ssl_ciphers', + 'ssl_cert_reqs', + 'ssl_keyfile', + 'ssl_keyfile_password', + 'ssl_certfile', + 'ssl_ca_certfile', + ] + for param in params: + if param not in expected_args: + raise ParameterError(( + 'Unexpected parameter for connection initialization: `{}`' + ).format(param)) + + def __init__( + self, client: 'Client', timeout: float = 2.0, + username: str = None, password: str = None, **ssl_params + ): + """ + Initialize connection. + + For the use of the SSL-related parameters see + https://docs.python.org/3/library/ssl.html#ssl-certificates. + + :param client: Ignite client object, + :param timeout: (optional) sets timeout (in seconds) for each socket + operation including `connect`. 0 means non-blocking mode, which is + virtually guaranteed to fail. Can accept integer or float value. + Default is None (blocking mode), + :param use_ssl: (optional) set to True if Ignite server uses SSL + on its binary connector. Defaults to use SSL when username + and password has been supplied, not to use SSL otherwise, + :param ssl_version: (optional) SSL version constant from standard + `ssl` module. Defaults to TLS v1.1, as in Ignite 2.5, + :param ssl_ciphers: (optional) ciphers to use. If not provided, + `ssl` default ciphers are used, + :param ssl_cert_reqs: (optional) determines how the remote side + certificate is treated: + + * `ssl.CERT_NONE` − remote certificate is ignored (default), + * `ssl.CERT_OPTIONAL` − remote certificate will be validated, + if provided, + * `ssl.CERT_REQUIRED` − valid remote certificate is required, + + :param ssl_keyfile: (optional) a path to SSL key file to identify + local (client) party, + :param ssl_keyfile_password: (optional) password for SSL key file, + can be provided when key file is encrypted to prevent OpenSSL + password prompt, + :param ssl_certfile: (optional) a path to ssl certificate file + to identify local (client) party, + :param ssl_ca_certfile: (optional) a path to a trusted certificate + or a certificate chain. Required to check the validity of the remote + (server-side) certificate, + :param username: (optional) user name to authenticate to Ignite + cluster, + :param password: (optional) password to authenticate to Ignite cluster. + """ + self.client = client + self.timeout = timeout + self.username = username + self.password = password + self._check_ssl_params(ssl_params) + if self.username and self.password and 'use_ssl' not in ssl_params: + ssl_params['use_ssl'] = True + self.ssl_params = ssl_params + self._failed = False + + @property + def closed(self) -> bool: + """ Tells if socket is closed. """ + return self._socket is None + + @property + def failed(self) -> bool: + """ Tells if connection is failed. """ + return self._failed + + @failed.setter + def failed(self, value): + self._failed = value + + @property + def alive(self) -> bool: + """ Tells if connection is up and no failure detected. """ + return not self.failed and not self.closed + + def __repr__(self) -> str: + return '{}:{}'.format(self.host or '?', self.port or '?') + + _wrap = wrap + + def get_protocol_version(self): + """ + Returns the tuple of major, minor, and revision numbers of the used + thin protocol version, or None, if no connection to the Ignite cluster + was yet established. + """ + return self.client.protocol_version + + def read_response(self) -> Union[dict, OrderedDict]: + """ + Processes server's response to the handshake request. + + :return: handshake data. + """ + response_start = Struct([ + ('length', Int), + ('op_code', Byte), + ]) + with BinaryStream(self, self.recv()) as stream: + start_class = response_start.parse(stream) + start = stream.read_ctype(start_class, direction=READ_BACKWARD) + data = response_start.to_python(start) + response_end = None + if data['op_code'] == 0: + response_end = Struct([ + ('version_major', Short), + ('version_minor', Short), + ('version_patch', Short), + ('message', String), + ]) + elif self.get_protocol_version() >= (1, 4, 0): + response_end = Struct([ + ('node_uuid', UUIDObject), + ]) + if response_end: + end_class = response_end.parse(stream) + end = stream.read_ctype(end_class, direction=READ_BACKWARD) + data.update(response_end.to_python(end)) + return data + + def connect( + self, host: str = None, port: int = None + ) -> Union[dict, OrderedDict]: + """ + Connect to the given server node with protocol version fallback. + + :param host: Ignite server node's host name or IP, + :param port: Ignite server node's port number. + """ + detecting_protocol = False + + # choose highest version first + if self.client.protocol_version is None: + detecting_protocol = True + self.client.protocol_version = max(PROTOCOLS) + + try: + result = self._connect_version(host, port) + except HandshakeError as e: + if e.expected_version in PROTOCOLS: + self.client.protocol_version = e.expected_version + result = self._connect_version(host, port) + else: + raise e + except connection_errors: + # restore undefined protocol version + if detecting_protocol: + self.client.protocol_version = None + raise + + # connection is ready for end user + self.uuid = result.get('node_uuid', None) # version-specific (1.4+) + + self.failed = False + return result + + def _connect_version( + self, host: str = None, port: int = None, + ) -> Union[dict, OrderedDict]: + """ + Connect to the given server node using protocol version + defined on client. + + :param host: Ignite server node's host name or IP, + :param port: Ignite server node's port number. + """ + + host = host or IGNITE_DEFAULT_HOST + port = port or IGNITE_DEFAULT_PORT + + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.settimeout(self.timeout) + self._socket = self._wrap(self._socket) + self._socket.connect((host, port)) + + protocol_version = self.client.protocol_version + + hs_request = HandshakeRequest( + protocol_version, + self.username, + self.password + ) + + with BinaryStream(self) as stream: + hs_request.from_python(stream) + self.send(stream.getbuffer()) + + hs_response = self.read_response() + if hs_response['op_code'] == 0: + self.close() + + error_text = 'Handshake error: {}'.format(hs_response['message']) + # if handshake fails for any reason other than protocol mismatch + # (i.e. authentication error), server version is 0.0.0 + if any([ + hs_response['version_major'], + hs_response['version_minor'], + hs_response['version_patch'], + ]): + error_text += ( + ' Server expects binary protocol version ' + '{version_major}.{version_minor}.{version_patch}. Client ' + 'provides {client_major}.{client_minor}.{client_patch}.' + ).format( + client_major=protocol_version[0], + client_minor=protocol_version[1], + client_patch=protocol_version[2], + **hs_response + ) + raise HandshakeError(( + hs_response['version_major'], + hs_response['version_minor'], + hs_response['version_patch'], + ), error_text) + self.host, self.port = host, port + return hs_response + + def reconnect(self): + # do not reconnect if connection is already working + # or was closed on purpose + if not self.failed: + return + + self.close() + + # connect and silence the connection errors + try: + self.connect(self.host, self.port) + except connection_errors: + pass + + def send(self, data: Union[bytes, bytearray, memoryview], flags=None): + """ + Send data down the socket. + + :param data: bytes to send, + :param flags: (optional) OS-specific flags. + """ + if self.closed: + raise SocketError('Attempt to use closed connection.') + + kwargs = {} + if flags is not None: + kwargs['flags'] = flags + + try: + self._socket.sendall(data, **kwargs) + except connection_errors: + self.failed = True + self.reconnect() + raise + + def recv(self, flags=None) -> bytearray: + def _recv(buffer, num_bytes): + bytes_to_receive = num_bytes + while bytes_to_receive > 0: + try: + bytes_rcvd = self._socket.recv_into(buffer, bytes_to_receive, **kwargs) + if bytes_rcvd == 0: + raise SocketError('Connection broken.') + except connection_errors: + self.failed = True + self.reconnect() + raise + + buffer = buffer[bytes_rcvd:] + bytes_to_receive -= bytes_rcvd + + if self.closed: + raise SocketError('Attempt to use closed connection.') + + kwargs = {} + if flags is not None: + kwargs['flags'] = flags + + data = bytearray(4) + _recv(memoryview(data), 4) + response_len = int.from_bytes(data, PROTOCOL_BYTE_ORDER) + + data.extend(bytearray(response_len)) + _recv(memoryview(data)[4:], response_len) + return data + + def close(self): + """ + Try to mark socket closed, then unlink it. This is recommended but + not required, since sockets are automatically closed when + garbage-collected. + """ + if self._socket: + try: + self._socket.shutdown(socket.SHUT_RDWR) + self._socket.close() + except connection_errors: + pass + + self._socket = None diff --git a/pyignite/datatypes/complex.py b/pyignite/datatypes/complex.py index aed3cda..b8d9c02 100644 --- a/pyignite/datatypes/complex.py +++ b/pyignite/datatypes/complex.py @@ -564,8 +564,8 @@ def to_python(cls, ctype_object, client: 'Client' = None, *args, **kwargs): @classmethod def from_python_not_null(cls, stream, value): - stream.register_binary_type(value.__class__) if getattr(value, '_buffer', None): stream.write(value._buffer) else: + stream.register_binary_type(value.__class__) value._from_python(stream) diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py index 5bd114b..b5be753 100644 --- a/pyignite/queries/query.py +++ b/pyignite/queries/query.py @@ -105,9 +105,11 @@ def perform( # this test depends on protocol version if getattr(response, 'flags', False) & RHF_TOPOLOGY_CHANGED: # update latest affinity version - conn.client.affinity_version = ( - response.affinity_version, response.affinity_minor - ) + new_affinity = (response.affinity_version, response.affinity_minor) + old_affinity = conn.client.affinity_version + + if new_affinity > old_affinity: + conn.client.affinity_version = new_affinity # build result result = APIResult(response) diff --git a/pyignite/stream/binary_stream.py b/pyignite/stream/binary_stream.py index 1ecdcfb..46ac683 100644 --- a/pyignite/stream/binary_stream.py +++ b/pyignite/stream/binary_stream.py @@ -95,7 +95,10 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): - self.stream.close() + try: + self.stream.close() + except BufferError: + pass def get_dataclass(self, header): # get field names from outer space diff --git a/pyignite/utils.py b/pyignite/utils.py index 3d0378f..6c636ae 100644 --- a/pyignite/utils.py +++ b/pyignite/utils.py @@ -18,7 +18,6 @@ import warnings from functools import wraps -from threading import Event, Thread from typing import Any, Optional, Type, Tuple, Union from pyignite.datatypes.base import IgniteDataType @@ -255,30 +254,6 @@ def unsigned(value: int, c_type: ctypes._SimpleCData = ctypes.c_uint) -> int: return c_type(value).value -class DaemonicTimer(Thread): - """ - Same as normal `threading.Timer`, but do not delay the program exit. - """ - - def __init__(self, interval, function, args=None, kwargs=None): - Thread.__init__(self, daemon=True) - self.interval = interval - self.function = function - self.args = args if args is not None else [] - self.kwargs = kwargs if kwargs is not None else {} - self.finished = Event() - - def cancel(self): - """Stop the timer if it hasn't finished yet.""" - self.finished.set() - - def run(self): - self.finished.wait(self.interval) - if not self.finished.is_set(): - self.function(*self.args, **self.kwargs) - self.finished.set() - - def capitalize(string: str) -> str: """ Capitalizing the string, assuming the first character is a letter. diff --git a/tests/config/log4j.xml.jinja2 b/tests/config/log4j.xml.jinja2 index 628f66c..983ae9e 100644 --- a/tests/config/log4j.xml.jinja2 +++ b/tests/config/log4j.xml.jinja2 @@ -33,7 +33,6 @@ - diff --git a/tests/conftest.py b/tests/conftest.py index 54a7fda..bc8804d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -120,7 +120,7 @@ def client_partition_aware_single_server( password ): node = node[:1] - yield from client(node, timeout, True, use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile, ssl_ca_certfile, + yield from client0(node, timeout, True, use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile, ssl_ca_certfile, ssl_cert_reqs, ssl_ciphers, ssl_version, username, password) @@ -211,7 +211,7 @@ def pytest_addoption(parser): '--timeout', action='store', type=float, - default=None, + default=2.0, help=( 'Timeout (in seconds) for each socket operation. Can accept ' 'integer or float value. Default is None' diff --git a/tests/test_affinity_request_routing.py b/tests/test_affinity_request_routing.py index 866222b..3489dea 100644 --- a/tests/test_affinity_request_routing.py +++ b/tests/test_affinity_request_routing.py @@ -18,6 +18,7 @@ from pyignite import * from pyignite.connection import Connection +from pyignite.constants import PROTOCOL_BYTE_ORDER from pyignite.datatypes import * from pyignite.datatypes.cache_config import CacheMode from pyignite.datatypes.prop_codes import * @@ -30,7 +31,12 @@ def patched_send(self, *args, **kwargs): """Patched send function that push to queue idx of server to which request is routed.""" - requests.append(self.port % 100) + buf = args[0] + if buf and len(buf) >= 6: + op_code = int.from_bytes(buf[4:6], byteorder=PROTOCOL_BYTE_ORDER) + # Filter only caches operation. + if 1000 <= op_code < 1100: + requests.append(self.port % 100) return old_send(self, *args, **kwargs) diff --git a/tox.ini b/tox.ini index 4361413..eb7d1a6 100644 --- a/tox.ini +++ b/tox.ini @@ -17,12 +17,6 @@ skipsdist = True envlist = py{36,37,38}-{no-ssl,ssl,ssl-password} -[travis] -python = - 3.6: py36-{no-ssl,ssl,ssl-password} - 3.7: py37-{no-ssl,ssl,ssl-password} - 3.8: py38-{no-ssl,ssl,ssl-password} - [testenv] passenv = TEAMCITY_VERSION IGNITE_HOME envdir = {homedir}/.virtualenvs/pyignite-{envname} From 1726b5044b83eba1d7a3a1a4171f47a27ae59ca8 Mon Sep 17 00:00:00 2001 From: Ivan Dashchinskiy Date: Sat, 13 Feb 2021 14:26:57 +0300 Subject: [PATCH 2/2] IGNITE-14167 Skip unneccesary parsing for Conditional --- pyignite/api/affinity.py | 6 ++++-- pyignite/datatypes/internal.py | 20 ++++++++++++++------ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/pyignite/api/affinity.py b/pyignite/api/affinity.py index 16148a1..7d09517 100644 --- a/pyignite/api/affinity.py +++ b/pyignite/api/affinity.py @@ -55,11 +55,13 @@ partition_mapping = StructArray([ ('is_applicable', Bool), - ('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1, + ('cache_mapping', Conditional(['is_applicable'], + lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1, lambda ctx: ctx['is_applicable'], cache_mapping, empty_cache_mapping)), - ('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1, + ('node_mapping', Conditional(['is_applicable'], + lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1, lambda ctx: ctx['is_applicable'], node_mapping, empty_node_mapping)), ]) diff --git a/pyignite/datatypes/internal.py b/pyignite/datatypes/internal.py index 0111a22..a6da9fe 100644 --- a/pyignite/datatypes/internal.py +++ b/pyignite/datatypes/internal.py @@ -18,7 +18,7 @@ import decimal from datetime import date, datetime, timedelta from io import SEEK_CUR -from typing import Any, Tuple, Union, Callable +from typing import Any, Tuple, Union, Callable, List import uuid import attr @@ -115,8 +115,9 @@ def tc_map(key: bytes, _memo_map: dict = {}): class Conditional: - - def __init__(self, predicate1: Callable[[any], bool], predicate2: Callable[[any], bool], var1, var2): + def __init__(self, fields: List, predicate1: Callable[[any], bool], + predicate2: Callable[[any], bool], var1, var2): + self.fields = fields self.predicate1 = predicate1 self.predicate2 = predicate2 self.var1 = var1 @@ -209,12 +210,19 @@ class Struct: defaults = attr.ib(type=dict, default={}) def parse(self, stream): - fields, values = [], {} + fields, ctx = [], {} + + for _, c_type in self.fields: + if isinstance(c_type, Conditional): + for name in c_type.fields: + ctx[name] = None + for name, c_type in self.fields: is_cond = isinstance(c_type, Conditional) - c_type = c_type.parse(stream, values) if is_cond else c_type.parse(stream) + c_type = c_type.parse(stream, ctx) if is_cond else c_type.parse(stream) fields.append((name, c_type)) - values[name] = stream.read_ctype(c_type, direction=READ_BACKWARD) + if name in ctx: + ctx[name] = stream.read_ctype(c_type, direction=READ_BACKWARD) data_class = type( 'Struct',