From fccea12019b0e2c530ec55a305b1d974c91879f4 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Tue, 20 Jul 2021 16:53:07 +0300 Subject: [PATCH 01/16] IGNITE-15118 Implement handshake timeout. --- pyignite/aio_client.py | 32 +++- pyignite/client.py | 39 ++++- pyignite/connection/aio_connection.py | 9 +- pyignite/connection/connection.py | 22 ++- tests/common/test_query_listener.py | 4 +- tests/conftest.py | 5 - tests/custom/test_handshake_timeout.py | 197 +++++++++++++++++++++++++ 7 files changed, 290 insertions(+), 18 deletions(-) create mode 100644 tests/custom/test_handshake_timeout.py diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py index 083c964..7b4f863 100644 --- a/pyignite/aio_client.py +++ b/pyignite/aio_client.py @@ -73,7 +73,37 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True, :param partition_aware: (optional) try to calculate the exact data placement from the key before to issue the key operation to the server node, `True` by default, - :param event_listeners: (optional) event listeners. + :param event_listeners: (optional) event listeners, + :param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection) + with node. Default is 10.0 seconds, + :param use_ssl: (optional) set to True if Ignite server uses SSL + on its binary connector. Defaults to use SSL when username + and password has been supplied, not to use SSL otherwise, + :param ssl_version: (optional) SSL version constant from standard + `ssl` module. Defaults to TLS v1.2, + :param ssl_ciphers: (optional) ciphers to use. If not provided, + `ssl` default ciphers are used, + :param ssl_cert_reqs: (optional) determines how the remote side + certificate is treated: + + * `ssl.CERT_NONE` − remote certificate is ignored (default), + * `ssl.CERT_OPTIONAL` − remote certificate will be validated, + if provided, + * `ssl.CERT_REQUIRED` − valid remote certificate is required, + + :param ssl_keyfile: (optional) a path to SSL key file to identify + local (client) party, + :param ssl_keyfile_password: (optional) password for SSL key file, + can be provided when key file is encrypted to prevent OpenSSL + password prompt, + :param ssl_certfile: (optional) a path to ssl certificate file + to identify local (client) party, + :param ssl_ca_certfile: (optional) a path to a trusted certificate + or a certificate chain. Required to check the validity of the remote + (server-side) certificate, + :param username: (optional) user name to authenticate to Ignite + cluster, + :param password: (optional) password to authenticate to Ignite cluster. """ super().__init__(compact_footer, partition_aware, event_listeners, **kwargs) self._registry_mux = asyncio.Lock() diff --git a/pyignite/client.py b/pyignite/client.py index e3dd71b..397c52e 100644 --- a/pyignite/client.py +++ b/pyignite/client.py @@ -346,6 +346,9 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True, """ Initialize client. + For the use of the SSL-related parameters see + https://docs.python.org/3/library/ssl.html#ssl-certificates. + :param compact_footer: (optional) use compact (True, recommended) or full (False) schema approach when serializing Complex objects. Default is to use the same approach the server is using (None). @@ -354,7 +357,41 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True, :param partition_aware: (optional) try to calculate the exact data placement from the key before to issue the key operation to the server node, `True` by default, - :param event_listeners: (optional) event listeners. + :param event_listeners: (optional) event listeners, + :param timeout: (optional) sets timeout (in seconds) for each socket + operation including `connect`. 0 means non-blocking mode, which is + virtually guaranteed to fail. Can accept integer or float value. + Default is None (blocking mode), + :param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection) + with node. Default is 10.0 seconds, + :param use_ssl: (optional) set to True if Ignite server uses SSL + on its binary connector. Defaults to use SSL when username + and password has been supplied, not to use SSL otherwise, + :param ssl_version: (optional) SSL version constant from standard + `ssl` module. Defaults to TLS v1.2, + :param ssl_ciphers: (optional) ciphers to use. If not provided, + `ssl` default ciphers are used, + :param ssl_cert_reqs: (optional) determines how the remote side + certificate is treated: + + * `ssl.CERT_NONE` − remote certificate is ignored (default), + * `ssl.CERT_OPTIONAL` − remote certificate will be validated, + if provided, + * `ssl.CERT_REQUIRED` − valid remote certificate is required, + + :param ssl_keyfile: (optional) a path to SSL key file to identify + local (client) party, + :param ssl_keyfile_password: (optional) password for SSL key file, + can be provided when key file is encrypted to prevent OpenSSL + password prompt, + :param ssl_certfile: (optional) a path to ssl certificate file + to identify local (client) party, + :param ssl_ca_certfile: (optional) a path to a trusted certificate + or a certificate chain. Required to check the validity of the remote + (server-side) certificate, + :param username: (optional) user name to authenticate to Ignite + cluster, + :param password: (optional) password to authenticate to Ignite cluster. """ super().__init__(compact_footer, partition_aware, event_listeners, **kwargs) diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py index 89de49d..95e4efc 100644 --- a/pyignite/connection/aio_connection.py +++ b/pyignite/connection/aio_connection.py @@ -118,11 +118,13 @@ def __init__(self, client: 'AioClient', host: str, port: int, username: str = No :param client: Ignite client object, :param host: Ignite server node's host name or IP, :param port: Ignite server node's port number, + :param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection) + with node. Default is 10.0 seconds, :param use_ssl: (optional) set to True if Ignite server uses SSL on its binary connector. Defaults to use SSL when username and password has been supplied, not to use SSL otherwise, :param ssl_version: (optional) SSL version constant from standard - `ssl` module. Defaults to TLS v1.1, as in Ignite 2.5, + `ssl` module. Defaults to TLS v1.2, :param ssl_ciphers: (optional) ciphers to use. If not provided, `ssl` default ciphers are used, :param ssl_cert_reqs: (optional) determines how the remote side @@ -227,7 +229,10 @@ async def _connect_version(self) -> Union[dict, OrderedDict]: handshake_fut = self._loop.create_future() self._transport, _ = await self._loop.create_connection(lambda: BaseProtocol(self, handshake_fut), host=self.host, port=self.port, ssl=ssl_context) - hs_response = await handshake_fut + try: + hs_response = await asyncio.wait_for(handshake_fut, self.handshake_timeout) + except asyncio.exceptions.TimeoutError: + raise ConnectionError('timed out') if hs_response.op_code == 0: await self.close() diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py index 2b9970a..57c16f2 100644 --- a/pyignite/connection/connection.py +++ b/pyignite/connection/connection.py @@ -19,7 +19,7 @@ from typing import Union from pyignite.constants import PROTOCOLS, IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER -from pyignite.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError +from pyignite.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError, ParameterError from .bitmask_feature import BitmaskFeature from .handshake import HandshakeRequest, HandshakeResponse @@ -34,14 +34,18 @@ class BaseConnection: def __init__(self, client, host: str = None, port: int = None, username: str = None, password: str = None, - **ssl_params): + handshake_timeout: float = 10.0, **ssl_params): self.client = client + self.handshake_timeout = handshake_timeout self.host = host if host else IGNITE_DEFAULT_HOST self.port = port if port else IGNITE_DEFAULT_PORT self.username = username self.password = password self.uuid = None + if handshake_timeout <= 0.0: + raise ParameterError("handshake_timeout should be positive") + check_ssl_params(ssl_params) if self.username and self.password and 'use_ssl' not in ssl_params: @@ -162,8 +166,9 @@ class Connection(BaseConnection): * binary protocol connector. Encapsulates handshake and failover reconnection. """ - def __init__(self, client: 'Client', host: str, port: int, timeout: float = None, - username: str = None, password: str = None, **ssl_params): + def __init__(self, client: 'Client', host: str, port: int, username: str = None, password: str = None, + timeout: float = None, handshake_timeout: float = 10.0, + **ssl_params): """ Initialize connection. @@ -177,11 +182,13 @@ def __init__(self, client: 'Client', host: str, port: int, timeout: float = None operation including `connect`. 0 means non-blocking mode, which is virtually guaranteed to fail. Can accept integer or float value. Default is None (blocking mode), + :param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection) + with node. Default is 10.0. :param use_ssl: (optional) set to True if Ignite server uses SSL on its binary connector. Defaults to use SSL when username and password has been supplied, not to use SSL otherwise, :param ssl_version: (optional) SSL version constant from standard - `ssl` module. Defaults to TLS v1.1, as in Ignite 2.5, + `ssl` module. Defaults to TLS v1.2, :param ssl_ciphers: (optional) ciphers to use. If not provided, `ssl` default ciphers are used, :param ssl_cert_reqs: (optional) determines how the remote side @@ -206,7 +213,7 @@ def __init__(self, client: 'Client', host: str, port: int, timeout: float = None cluster, :param password: (optional) password to authenticate to Ignite cluster. """ - super().__init__(client, host, port, username, password, **ssl_params) + super().__init__(client, host, port, username, password, handshake_timeout, **ssl_params) self.timeout = timeout self._socket = None @@ -245,6 +252,7 @@ def connect(self): self.client.protocol_context = None raise e + self._socket.settimeout(self.timeout) self._on_handshake_success(result) def _connect_version(self) -> Union[dict, OrderedDict]: @@ -254,7 +262,7 @@ def _connect_version(self) -> Union[dict, OrderedDict]: """ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._socket.settimeout(self.timeout) + self._socket.settimeout(self.handshake_timeout) self._socket = wrap(self._socket, self.ssl_params) self._socket.connect((self.host, self.port)) diff --git a/tests/common/test_query_listener.py b/tests/common/test_query_listener.py index afff542..65ae478 100644 --- a/tests/common/test_query_listener.py +++ b/tests/common/test_query_listener.py @@ -93,7 +93,7 @@ def __assert_fail_events(client): assert ev.port == conn.port assert ev.node_uuid == str(conn.uuid if conn.uuid else '') assert 'Cache does not exist' in ev.err_msg - assert ev.duration > 0 + assert ev.duration >= 0 def test_query_success_events(client): @@ -124,4 +124,4 @@ def __assert_success_events(client): assert ev.host == conn.host assert ev.port == conn.port assert ev.node_uuid == str(conn.uuid if conn.uuid else '') - assert ev.duration > 0 + assert ev.duration >= 0 diff --git a/tests/conftest.py b/tests/conftest.py index 70995a2..6f92f0c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,11 +20,6 @@ logger = logging.getLogger('pyignite') logger.setLevel(logging.DEBUG) -handler = logging.StreamHandler(stream=sys.stdout) -handler.setFormatter( - logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s') -) -logger.addHandler(handler) @pytest.fixture(autouse=True) diff --git a/tests/custom/test_handshake_timeout.py b/tests/custom/test_handshake_timeout.py new file mode 100644 index 0000000..6913ee5 --- /dev/null +++ b/tests/custom/test_handshake_timeout.py @@ -0,0 +1,197 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import socket +import struct +import time +from concurrent.futures import ThreadPoolExecutor + +import pytest + +from pyignite import Client, AioClient +from pyignite import monitoring +from pyignite.exceptions import ReconnectError, ParameterError +from pyignite.monitoring import HandshakeFailedEvent + +logger = logging.getLogger('fake_ignite') +logger.setLevel(logging.DEBUG) + +DEFAULT_HOST = '127.0.0.1' +DEFAULT_PORT = 10800 + + +class FakeIgniteProtocol(asyncio.Protocol): + def __init__(self, server): + self._transport = None + self._server = server + self._buf = bytearray() + self._done_handshake = False + + def connection_made(self, transport): + sock = transport.get_extra_info('socket') + if sock is not None: + logger.debug('Connecting from %s', sock) + self._server.add_client(transport) + self._transport = transport + + def _handshake_response(self, error=True): + return struct.pack(' (1, 3, 0): + self._transport.write(self._handshake_response(True)) + self._transport.close() + else: + self._transport.write(self._handshake_response(False)) + self._done_handshake = True + self._buf = bytearray() + + +class FakeIgniteServer: + def __init__(self, do_handshake=False): + self.clients = [] + self.server = None + self.do_handshake = do_handshake + self.loop = asyncio.get_event_loop() + + async def start(self): + self.server = await self.loop.create_server(lambda: FakeIgniteProtocol(self), DEFAULT_HOST, DEFAULT_PORT) + await self.server.start_serving() + + def add_client(self, client): + self.clients.append(client) + + async def close(self): + for client in self.clients: + client.close() + + if self.server: + self.server.close() + await self.server.wait_closed() + + +class HandshakeTimeoutListener(monitoring.ConnectionEventListener): + def __init__(self): + self.events = [] + + def on_handshake_fail(self, event: HandshakeFailedEvent): + self.events.append(event) + + +@pytest.fixture +async def server(): + server = FakeIgniteServer() + try: + await server.start() + yield server + finally: + await server.close() + + +@pytest.fixture +async def server_with_handshake(): + server = FakeIgniteServer(do_handshake=True) + try: + await server.start() + yield server + finally: + await server.close() + + +@pytest.mark.asyncio +async def test_handshake_timeout(server, event_loop): + def sync_client_connect(): + hs_to_listener = HandshakeTimeoutListener() + client = Client(handshake_timeout=3.0, event_listeners=[hs_to_listener]) + start = time.monotonic() + try: + client.connect(DEFAULT_HOST, DEFAULT_PORT) + except Exception as e: + return time.monotonic() - start, hs_to_listener.events, e + return time.monotonic() - start, hs_to_listener.events, None + + duration, events, err = await event_loop.run_in_executor(ThreadPoolExecutor(), sync_client_connect) + + assert isinstance(err, ReconnectError) + assert 3.0 <= duration < 4.0 + assert len(events) > 0 + for ev in events: + assert isinstance(ev, HandshakeFailedEvent) + assert 'timed out' in ev.error_msg + + +@pytest.mark.asyncio +async def test_handshake_timeout_async(server): + hs_to_listener = HandshakeTimeoutListener() + client = AioClient(handshake_timeout=3.0, event_listeners=[hs_to_listener]) + with pytest.raises(ReconnectError): + start = time.monotonic() + await client.connect(DEFAULT_HOST, DEFAULT_PORT) + + assert 3.0 <= time.monotonic() - start < 4.0 + assert len(hs_to_listener.events) > 0 + for ev in hs_to_listener.events: + assert isinstance(ev, HandshakeFailedEvent) + assert 'timed out' in ev.error_msg + + +@pytest.mark.asyncio +async def test_socket_timeout_applied_sync(server_with_handshake, event_loop): + def sync_client_connect(): + hs_to_listener = HandshakeTimeoutListener() + client = Client(timeout=5.0, handshake_timeout=3.0, event_listeners=[hs_to_listener]) + start = time.monotonic() + try: + client.connect(DEFAULT_HOST, DEFAULT_PORT) + assert all(n.alive for n in client._nodes) + client.get_cache_names() + except Exception as e: + return time.monotonic() - start, hs_to_listener.events, e + return time.monotonic() - start, hs_to_listener.events, None + + duration, events, err = await event_loop.run_in_executor(ThreadPoolExecutor(), sync_client_connect) + + assert isinstance(err, socket.timeout) + assert 5.0 <= duration < 6.0 + assert len(events) == 0 + + +@pytest.mark.parametrize( + 'handshake_timeout', + [0.0, -10.0, -0.01] +) +@pytest.mark.asyncio +async def test_handshake_timeout_param_validation(handshake_timeout): + with pytest.raises(ParameterError): + await AioClient(handshake_timeout=handshake_timeout).connect(DEFAULT_HOST, DEFAULT_PORT) + + with pytest.raises(ParameterError): + Client(handshake_timeout=handshake_timeout).connect(DEFAULT_HOST, DEFAULT_PORT) From aece47ba67a7a72afccaa401ac9278b9464eecf4 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Tue, 20 Jul 2021 17:18:30 +0300 Subject: [PATCH 02/16] IGNITE-15118 Simplify logging --- tests/custom/test_handshake_timeout.py | 11 +++++++---- tox.ini | 7 ++++++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/tests/custom/test_handshake_timeout.py b/tests/custom/test_handshake_timeout.py index 6913ee5..c670491 100644 --- a/tests/custom/test_handshake_timeout.py +++ b/tests/custom/test_handshake_timeout.py @@ -55,7 +55,7 @@ def _parse_handshake_request(self, buf): return struct.unpack(' (1, 3, 0): - self._transport.write(self._handshake_response(True)) + response = self._handshake_response(True) + logger.debug(f'Writing handshake response {response}') + self._transport.write(response) self._transport.close() else: - self._transport.write(self._handshake_response(False)) + response = self._handshake_response(False) + logger.debug(f'Writing handshake response {response}') + self._transport.write(response) self._done_handshake = True self._buf = bytearray() diff --git a/tox.ini b/tox.ini index 90153da..a47a504 100644 --- a/tox.ini +++ b/tox.ini @@ -17,6 +17,11 @@ skipsdist = True envlist = codestyle,py{36,37,38,39} +[pytest] +log_cli = True +log_cli_level = DEBUG +log_cli_format = '%(asctime)s %(name)s %(levelname)s %(message)s' + [flake8] max-line-length=120 ignore = F401,F403,F405,F821 @@ -34,7 +39,7 @@ deps = recreate = True usedevelop = True commands = - pytest {env:PYTESTARGS:} {posargs} --force-cext --examples + pytest {env:PYTESTARGS:} {posargs} --force-cext --examples --log-level=DEBUG [testenv:py{36,37,38,39}-jenkins] setenv: From bb36e185389a8ef9d815c9b6c9f41322a4325977 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Tue, 20 Jul 2021 17:55:34 +0300 Subject: [PATCH 03/16] IGNITE-15118 Remove excessive logging for passed tests --- tox.ini | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tox.ini b/tox.ini index a47a504..83b00b2 100644 --- a/tox.ini +++ b/tox.ini @@ -18,9 +18,11 @@ skipsdist = True envlist = codestyle,py{36,37,38,39} [pytest] -log_cli = True -log_cli_level = DEBUG -log_cli_format = '%(asctime)s %(name)s %(levelname)s %(message)s' +log_format = %(asctime)s %(name)s %(levelname)s %(message)s +log_date_format = %Y-%m-%d %H:%M:%S +# Uncomment if you want verbose logging for all tests (for failed it will be printed anyway). +# log_cli = True +# log_cli_level = DEBUG [flake8] max-line-length=120 From 154494207b3921594140be84b86c0038a9e8f712 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Tue, 20 Jul 2021 18:33:47 +0300 Subject: [PATCH 04/16] IGNITE-15118 Minor for docs. --- pyignite/aio_client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py index 7b4f863..b6ded74 100644 --- a/pyignite/aio_client.py +++ b/pyignite/aio_client.py @@ -65,6 +65,9 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True, """ Initialize client. + For the use of the SSL-related parameters see + https://docs.python.org/3/library/ssl.html#ssl-certificates. + :param compact_footer: (optional) use compact (True, recommended) or full (False) schema approach when serializing Complex objects. Default is to use the same approach the server is using (None). From 0662dc71b2bf2179a83bcef5de7f10a05808da00 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Tue, 20 Jul 2021 18:37:05 +0300 Subject: [PATCH 05/16] IGNITE-15118 Minors for tox.ini --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 83b00b2..964b748 100644 --- a/tox.ini +++ b/tox.ini @@ -41,7 +41,7 @@ deps = recreate = True usedevelop = True commands = - pytest {env:PYTESTARGS:} {posargs} --force-cext --examples --log-level=DEBUG + pytest {env:PYTESTARGS:} {posargs} --force-cext --examples [testenv:py{36,37,38,39}-jenkins] setenv: From a0f2c409ebef0628d0b59674bf14da9abaa0e56f Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Tue, 20 Jul 2021 21:18:34 +0300 Subject: [PATCH 06/16] IGNITE-15118 Fix for py36 and py37 --- pyignite/connection/aio_connection.py | 2 +- tests/custom/test_handshake_timeout.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py index 95e4efc..44a0265 100644 --- a/pyignite/connection/aio_connection.py +++ b/pyignite/connection/aio_connection.py @@ -231,7 +231,7 @@ async def _connect_version(self) -> Union[dict, OrderedDict]: host=self.host, port=self.port, ssl=ssl_context) try: hs_response = await asyncio.wait_for(handshake_fut, self.handshake_timeout) - except asyncio.exceptions.TimeoutError: + except asyncio.TimeoutError: raise ConnectionError('timed out') if hs_response.op_code == 0: diff --git a/tests/custom/test_handshake_timeout.py b/tests/custom/test_handshake_timeout.py index c670491..7d88637 100644 --- a/tests/custom/test_handshake_timeout.py +++ b/tests/custom/test_handshake_timeout.py @@ -87,7 +87,6 @@ def __init__(self, do_handshake=False): async def start(self): self.server = await self.loop.create_server(lambda: FakeIgniteProtocol(self), DEFAULT_HOST, DEFAULT_PORT) - await self.server.start_serving() def add_client(self, client): self.clients.append(client) From 0d0b108093d68fa650dae09b36dd84950c0c4966 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Tue, 20 Jul 2021 22:21:50 +0300 Subject: [PATCH 07/16] IGNITE-15118 Fix for py36 and py37 --- pyignite/connection/aio_connection.py | 2 +- tests/custom/test_handshake_timeout.py | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py index 44a0265..a79925d 100644 --- a/pyignite/connection/aio_connection.py +++ b/pyignite/connection/aio_connection.py @@ -167,7 +167,6 @@ async def connect(self): """ if self.alive: return - self._closed = False await self._connect() async def _connect(self): @@ -227,6 +226,7 @@ async def _connect_version(self) -> Union[dict, OrderedDict]: ssl_context = create_ssl_context(self.ssl_params) handshake_fut = self._loop.create_future() + self._closed = False self._transport, _ = await self._loop.create_connection(lambda: BaseProtocol(self, handshake_fut), host=self.host, port=self.port, ssl=ssl_context) try: diff --git a/tests/custom/test_handshake_timeout.py b/tests/custom/test_handshake_timeout.py index 7d88637..f6d7e31 100644 --- a/tests/custom/test_handshake_timeout.py +++ b/tests/custom/test_handshake_timeout.py @@ -49,7 +49,10 @@ def connection_made(self, transport): self._transport = transport def _handshake_response(self, error=True): - return struct.pack(' Date: Wed, 21 Jul 2021 12:05:56 +0300 Subject: [PATCH 08/16] IGNITE-15118 Fix for old ignite versions --- examples/transactions.py | 6 +++++ pyignite/connection/aio_connection.py | 34 +++++++++++++------------ pyignite/connection/connection.py | 34 +++++++++++++------------ pyignite/connection/protocol_context.py | 3 +++ tests/common/test_query_listener.py | 14 +++++----- tests/custom/test_cluster.py | 2 +- tests/custom/test_connection_events.py | 6 ++--- tests/security/test_auth.py | 3 ++- 8 files changed, 58 insertions(+), 44 deletions(-) diff --git a/examples/transactions.py b/examples/transactions.py index 53da05f..53e9c30 100644 --- a/examples/transactions.py +++ b/examples/transactions.py @@ -130,6 +130,12 @@ def sync_example(): if __name__ == '__main__': + client = Client() + with client.connect('127.0.0.1', 10800): + if not client.protocol_context.is_transactions_supported(): + print("'Transactions' API is not supported by cluster. Finishing...") + exit(0) + print("Starting sync example") sync_example() diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py index a79925d..2866235 100644 --- a/pyignite/connection/aio_connection.py +++ b/pyignite/connection/aio_connection.py @@ -177,25 +177,27 @@ async def _connect(self): detecting_protocol = True self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported()) - try: - self._on_handshake_start() - result = await self._connect_version() - except HandshakeError as e: - if e.expected_version in PROTOCOLS: - self.client.protocol_context.version = e.expected_version + while True: + try: + self._on_handshake_start() result = await self._connect_version() - else: + break + except HandshakeError as e: + if e.expected_version in PROTOCOLS: + self.client.protocol_context.version = e.expected_version + continue + else: + self._on_handshake_fail(e) + raise e + except AuthenticationError as e: self._on_handshake_fail(e) raise e - except AuthenticationError as e: - self._on_handshake_fail(e) - raise e - except Exception as e: - self._on_handshake_fail(e) - # restore undefined protocol version - if detecting_protocol: - self.client.protocol_context = None - raise e + except Exception as e: + self._on_handshake_fail(e) + # restore undefined protocol version + if detecting_protocol: + self.client.protocol_context = None + raise e self._on_handshake_success(result) diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py index 57c16f2..57c63e1 100644 --- a/pyignite/connection/connection.py +++ b/pyignite/connection/connection.py @@ -232,25 +232,27 @@ def connect(self): detecting_protocol = True self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported()) - try: - self._on_handshake_start() - result = self._connect_version() - except HandshakeError as e: - if e.expected_version in PROTOCOLS: - self.client.protocol_context.version = e.expected_version + while True: + try: + self._on_handshake_start() result = self._connect_version() - else: + break + except HandshakeError as e: + if e.expected_version in PROTOCOLS: + self.client.protocol_context.version = e.expected_version + continue + else: + self._on_handshake_fail(e) + raise e + except AuthenticationError as e: self._on_handshake_fail(e) raise e - except AuthenticationError as e: - self._on_handshake_fail(e) - raise e - except Exception as e: - self._on_handshake_fail(e) - # restore undefined protocol version - if detecting_protocol: - self.client.protocol_context = None - raise e + except Exception as e: + self._on_handshake_fail(e) + # restore undefined protocol version + if detecting_protocol: + self.client.protocol_context = None + raise e self._socket.settimeout(self.timeout) self._on_handshake_success(result) diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py index ba6d9e4..f60d45b 100644 --- a/pyignite/connection/protocol_context.py +++ b/pyignite/connection/protocol_context.py @@ -40,6 +40,9 @@ def __eq__(self, other): def __str__(self): return f'ProtocolContext(version={self._version}, features={self._features})' + def __repr__(self): + return self.__str__() + def _ensure_consistency(self): if not self.is_feature_flags_supported(): self._features = None diff --git a/tests/common/test_query_listener.py b/tests/common/test_query_listener.py index 65ae478..8310117 100644 --- a/tests/common/test_query_listener.py +++ b/tests/common/test_query_listener.py @@ -17,7 +17,7 @@ from pyignite import Client, AioClient from pyignite.exceptions import CacheError from pyignite.monitoring import QueryEventListener, QueryStartEvent, QueryFailEvent, QuerySuccessEvent -from pyignite.queries.op_codes import OP_CACHE_PUT, OP_CACHE_PARTITIONS, OP_CLUSTER_GET_STATE +from pyignite.queries.op_codes import OP_CACHE_PUT, OP_CACHE_PARTITIONS, OP_CACHE_GET_NAMES events = [] @@ -97,13 +97,13 @@ def __assert_fail_events(client): def test_query_success_events(client): - client.get_cluster().get_state() + client.get_cache_names() __assert_success_events(client) @pytest.mark.asyncio async def test_query_success_events_async(async_client): - await async_client.get_cluster().get_state() + await async_client.get_cache_names() __assert_success_events(async_client) @@ -112,15 +112,15 @@ def __assert_success_events(client): conn = client._nodes[0] for ev in events: if isinstance(ev, QueryStartEvent): - assert ev.op_code == OP_CLUSTER_GET_STATE - assert ev.op_name == 'OP_CLUSTER_GET_STATE' + assert ev.op_code == OP_CACHE_GET_NAMES + assert ev.op_name == 'OP_CACHE_GET_NAMES' assert ev.host == conn.host assert ev.port == conn.port assert ev.node_uuid == str(conn.uuid if conn.uuid else '') if isinstance(ev, QuerySuccessEvent): - assert ev.op_code == OP_CLUSTER_GET_STATE - assert ev.op_name == 'OP_CLUSTER_GET_STATE' + assert ev.op_code == OP_CACHE_GET_NAMES + assert ev.op_name == 'OP_CACHE_GET_NAMES' assert ev.host == conn.host assert ev.port == conn.port assert ev.node_uuid == str(conn.uuid if conn.uuid else '') diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py index e94853a..ae83ecd 100644 --- a/tests/custom/test_cluster.py +++ b/tests/custom/test_cluster.py @@ -49,7 +49,7 @@ def cluster_api_supported(request, server1): client = Client() with client.connect('127.0.0.1', 10801): if not client.protocol_context.is_cluster_api_supported(): - pytest.skip(f'skipped {request.node.name}, ExpiryPolicy APIis not supported.') + pytest.skip(f'skipped {request.node.name}, Cluster API is not supported.') def test_cluster_set_active(with_persistence): diff --git a/tests/custom/test_connection_events.py b/tests/custom/test_connection_events.py index bee9395..9ece9a8 100644 --- a/tests/custom/test_connection_events.py +++ b/tests/custom/test_connection_events.py @@ -104,7 +104,7 @@ async def test_events_async(request, server2): def __assert_events(nodes, protocol_context): assert len([e for e in events if isinstance(e, ConnectionLostEvent)]) == 1 # ConnectionLostEvent is a subclass of ConnectionClosedEvent - assert len([e for e in events if type(e) == ConnectionClosedEvent]) == 1 + assert len([e for e in events if type(e) == ConnectionClosedEvent and e.node_uuid]) == 1 assert len([e for e in events if isinstance(e, HandshakeSuccessEvent)]) == 2 for ev in events: @@ -114,7 +114,6 @@ def __assert_events(nodes, protocol_context): assert ev.node_uuid == str(nodes[ev.port].uuid) assert ev.error_msg elif isinstance(ev, HandshakeStartEvent): - assert ev.protocol_context == protocol_context assert ev.port in {10801, 10802} elif isinstance(ev, HandshakeFailedEvent): assert ev.port == 10802 @@ -126,4 +125,5 @@ def __assert_events(nodes, protocol_context): assert ev.protocol_context == protocol_context elif isinstance(ev, ConnectionClosedEvent): assert ev.port == 10801 - assert ev.node_uuid == str(nodes[ev.port].uuid) + if ev.node_uuid: # Possible if protocol negotiation occurred. + assert ev.node_uuid == str(nodes[ev.port].uuid) diff --git a/tests/security/test_auth.py b/tests/security/test_auth.py index 503cf88..83ac780 100644 --- a/tests/security/test_auth.py +++ b/tests/security/test_auth.py @@ -95,7 +95,8 @@ def __assert_successful_connect_events(conn, listener): assert ev.host == conn.host assert ev.port == conn.port if isinstance(ev, (HandshakeSuccessEvent, ConnectionClosedEvent)): - assert ev.node_uuid == str(conn.uuid if conn.uuid else '') + if ev.node_uuid: + assert ev.node_uuid == str(conn.uuid) if isinstance(ev, HandshakeSuccessEvent): assert ev.protocol_context From c60f8e2bad3eaf3f059b8b2dc33e28c2da96d7c4 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Wed, 21 Jul 2021 12:08:00 +0300 Subject: [PATCH 09/16] IGNITE-15118 Add travis wait --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 74909b8..7e98b48 100644 --- a/.travis.yml +++ b/.travis.yml @@ -51,4 +51,4 @@ jobs: env: TOXENV=py39 install: pip install tox -script: tox \ No newline at end of file +script: travis_wait 30 tox \ No newline at end of file From 58e543315f7a06373dc0c14af0e950d6b8810098 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Wed, 21 Jul 2021 12:08:32 +0300 Subject: [PATCH 10/16] IGNITE-15118 Add travis wait --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 7e98b48..3fef24d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -51,4 +51,4 @@ jobs: env: TOXENV=py39 install: pip install tox -script: travis_wait 30 tox \ No newline at end of file +script: travis_wait 30 tox From 5d90d09e21e6fd2d4bcbab5631f899823e244a66 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Wed, 21 Jul 2021 12:35:43 +0300 Subject: [PATCH 11/16] IGNITE-15118 Preventing possible hanging of test --- tests/custom/test_connection_events.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/tests/custom/test_connection_events.py b/tests/custom/test_connection_events.py index 9ece9a8..c7bbed0 100644 --- a/tests/custom/test_connection_events.py +++ b/tests/custom/test_connection_events.py @@ -12,11 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import random - import pytest from pyignite import Client, AioClient +from pyignite.datatypes.cache_config import CacheMode +from pyignite.datatypes.prop_codes import PROP_NAME, PROP_CACHE_MODE from pyignite.monitoring import ConnectionEventListener, ConnectionLostEvent, ConnectionClosedEvent, \ HandshakeSuccessEvent, HandshakeFailedEvent, HandshakeStartEvent @@ -65,12 +65,16 @@ def test_events(request, server2): with client.connect([('127.0.0.1', 10800 + idx) for idx in range(1, 3)]): protocol_context = client.protocol_context nodes = {conn.port: conn for conn in client._nodes} - cache = client.get_or_create_cache(request.node.name) + cache = client.get_or_create_cache({ + PROP_NAME: request.node.name, + PROP_CACHE_MODE: CacheMode.REPLICATED, + }) + kill_process_tree(server2.pid) - while True: + for _ in range(0, 100): try: - cache.put(random.randint(0, 1000), 1) + cache.put(1, 1) except: # noqa 13 pass @@ -86,12 +90,15 @@ async def test_events_async(request, server2): async with client.connect([('127.0.0.1', 10800 + idx) for idx in range(1, 3)]): protocol_context = client.protocol_context nodes = {conn.port: conn for conn in client._nodes} - cache = await client.get_or_create_cache(request.node.name) + cache = await client.get_or_create_cache({ + PROP_NAME: request.node.name, + PROP_CACHE_MODE: CacheMode.REPLICATED, + }) kill_process_tree(server2.pid) - while True: + for _ in range(0, 100): try: - await cache.put(random.randint(0, 1000), 1) + await cache.put(1, 1) except: # noqa 13 pass From c81b498605cc6f6ec3b9f8620647ccfd456f5ed7 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Wed, 21 Jul 2021 12:40:49 +0300 Subject: [PATCH 12/16] IGNITE-15118 Rename test to more meaningful name --- tests/custom/test_handshake_timeout.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/custom/test_handshake_timeout.py b/tests/custom/test_handshake_timeout.py index f6d7e31..bae184d 100644 --- a/tests/custom/test_handshake_timeout.py +++ b/tests/custom/test_handshake_timeout.py @@ -190,7 +190,7 @@ def sync_client_connect(): @pytest.mark.asyncio -async def test_socket_timeout_applied_async(server_with_handshake, event_loop): +async def test_handshake_timeout_not_affected_for_others_requests_async(server_with_handshake): hs_to_listener = HandshakeTimeoutListener() client = AioClient(handshake_timeout=3.0, event_listeners=[hs_to_listener]) with pytest.raises(asyncio.TimeoutError): From b4db37600f99ca6bef0968f3e8c176fad7652382 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Wed, 21 Jul 2021 13:57:56 +0300 Subject: [PATCH 13/16] IGNITE-15118 Fix warning on python 3.7 --- pyignite/transaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyignite/transaction.py b/pyignite/transaction.py index eb77f8d..3003eb6 100644 --- a/pyignite/transaction.py +++ b/pyignite/transaction.py @@ -23,7 +23,7 @@ def _validate_int_enum_param(value: Union[int, IntEnum], cls: Type[IntEnum]): - if value not in cls: + if value not in set(v.value for v in cls): # Use this trick to disable warning on python 3.7 raise ValueError(f'{value} not in {cls}') return value From e7975160f38b3a052d34851474efb3b741de8034 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Wed, 21 Jul 2021 17:13:50 +0300 Subject: [PATCH 14/16] IGNITE-15118 Revert travis wait --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 3fef24d..2cd3e2b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -51,4 +51,4 @@ jobs: env: TOXENV=py39 install: pip install tox -script: travis_wait 30 tox +script: tox From ce37dee6cfcec66384649972a9858d2a67b1c138 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Wed, 21 Jul 2021 18:22:00 +0300 Subject: [PATCH 15/16] IGNITE-15118 Minor --- pyignite/connection/aio_connection.py | 5 ++--- pyignite/connection/connection.py | 7 +++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py index 2866235..4d13d6e 100644 --- a/pyignite/connection/aio_connection.py +++ b/pyignite/connection/aio_connection.py @@ -181,7 +181,8 @@ async def _connect(self): try: self._on_handshake_start() result = await self._connect_version() - break + self._on_handshake_success(result) + return except HandshakeError as e: if e.expected_version in PROTOCOLS: self.client.protocol_context.version = e.expected_version @@ -199,8 +200,6 @@ async def _connect(self): self.client.protocol_context = None raise e - self._on_handshake_success(result) - def process_connection_lost(self, err, reconnect=False): self.failed = True for _, fut in self._pending_reqs.items(): diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py index 57c63e1..98ba7e0 100644 --- a/pyignite/connection/connection.py +++ b/pyignite/connection/connection.py @@ -236,7 +236,9 @@ def connect(self): try: self._on_handshake_start() result = self._connect_version() - break + self._socket.settimeout(self.timeout) + self._on_handshake_success(result) + return except HandshakeError as e: if e.expected_version in PROTOCOLS: self.client.protocol_context.version = e.expected_version @@ -254,9 +256,6 @@ def connect(self): self.client.protocol_context = None raise e - self._socket.settimeout(self.timeout) - self._on_handshake_success(result) - def _connect_version(self) -> Union[dict, OrderedDict]: """ Connect to the given server node using protocol version From 2e4182ca9d3d57c6ef95e9ce1314694d9d5fff8d Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Wed, 21 Jul 2021 20:53:01 +0300 Subject: [PATCH 16/16] IGNITE-15118 Change some asserts intervals --- tests/custom/test_connection_events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/custom/test_connection_events.py b/tests/custom/test_connection_events.py index c7bbed0..f49ad61 100644 --- a/tests/custom/test_connection_events.py +++ b/tests/custom/test_connection_events.py @@ -111,7 +111,7 @@ async def test_events_async(request, server2): def __assert_events(nodes, protocol_context): assert len([e for e in events if isinstance(e, ConnectionLostEvent)]) == 1 # ConnectionLostEvent is a subclass of ConnectionClosedEvent - assert len([e for e in events if type(e) == ConnectionClosedEvent and e.node_uuid]) == 1 + assert 1 <= len([e for e in events if type(e) == ConnectionClosedEvent and e.node_uuid]) <= 2 assert len([e for e in events if isinstance(e, HandshakeSuccessEvent)]) == 2 for ev in events: @@ -131,6 +131,6 @@ def __assert_events(nodes, protocol_context): assert ev.node_uuid == str(nodes[ev.port].uuid) assert ev.protocol_context == protocol_context elif isinstance(ev, ConnectionClosedEvent): - assert ev.port == 10801 + assert ev.port in {10801, 10802} if ev.node_uuid: # Possible if protocol negotiation occurred. assert ev.node_uuid == str(nodes[ev.port].uuid)