From e2851d2ebf133d5531d48a7e30ad6ed2a487e528 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Thu, 15 Jul 2021 00:55:16 +0300 Subject: [PATCH 1/6] IGNITE-15102 Implement connection events. --- pyignite/aio_client.py | 10 +- pyignite/client.py | 15 +- pyignite/connection/connection.py | 27 +++- pyignite/connection/protocol_context.py | 3 + pyignite/monitoring.py | 175 ++++++++++++++++++++++++ tests/security/conftest.py | 24 ++++ tests/security/test_auth.py | 54 ++++++-- tests/security/test_ssl.py | 28 +++- 8 files changed, 309 insertions(+), 27 deletions(-) create mode 100644 pyignite/monitoring.py diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py index 0bb2b8c..eb47c53 100644 --- a/pyignite/aio_client.py +++ b/pyignite/aio_client.py @@ -16,7 +16,7 @@ import random import sys from itertools import chain -from typing import Iterable, Type, Union, Any, Dict, Optional +from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence from .aio_cluster import AioCluster from .api import cache_get_node_partitions_async @@ -60,7 +60,8 @@ class AioClient(BaseClient): Asynchronous Client implementation. """ - def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **kwargs): + def __init__(self, compact_footer: bool = None, partition_aware: bool = True, + event_listeners: Optional[Sequence] = None, **kwargs): """ Initialize client. @@ -71,9 +72,10 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True, ** https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema :param partition_aware: (optional) try to calculate the exact data placement from the key before to issue the key operation to the - server node, `True` by default. + server node, `True` by default, + :param event_listeners: (optional) event listeners. """ - super().__init__(compact_footer, partition_aware, **kwargs) + super().__init__(compact_footer, partition_aware, event_listeners, **kwargs) self._registry_mux = asyncio.Lock() self._affinity_query_mux = asyncio.Lock() diff --git a/pyignite/client.py b/pyignite/client.py index 6a499a3..3be7a8f 100644 --- a/pyignite/client.py +++ b/pyignite/client.py @@ -44,7 +44,7 @@ import random import re from itertools import chain -from typing import Iterable, Type, Union, Any, Dict, Optional +from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence from .api import cache_get_node_partitions from .api.binary import get_binary_type, put_binary_type @@ -66,6 +66,7 @@ get_field_by_id, unsigned ) from .binary import GenericObjectMeta +from .monitoring import _EventListeners __all__ = ['Client'] @@ -76,7 +77,8 @@ class BaseClient: _identifier = re.compile(r'[^0-9a-zA-Z_.+$]', re.UNICODE) _ident_start = re.compile(r'^[^a-zA-Z_]+', re.UNICODE) - def __init__(self, compact_footer: bool = None, partition_aware: bool = False, **kwargs): + def __init__(self, compact_footer: bool = None, partition_aware: bool = False, + event_listeners: Optional[Sequence] = None, **kwargs): self._compact_footer = compact_footer self._partition_aware = partition_aware self._connection_args = kwargs @@ -87,6 +89,7 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = False, * self.affinity_version = (0, 0) self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)} self._protocol_context = None + self._event_listeners = _EventListeners(event_listeners) @property def protocol_context(self): @@ -338,7 +341,8 @@ class Client(BaseClient): Synchronous Client implementation. """ - def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **kwargs): + def __init__(self, compact_footer: bool = None, partition_aware: bool = True, + event_listeners: Optional[Sequence] = None, **kwargs): """ Initialize client. @@ -349,9 +353,10 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True, ** https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema :param partition_aware: (optional) try to calculate the exact data placement from the key before to issue the key operation to the - server node, `True` by default. + server node, `True` by default, + :param event_listeners: (optional) event listeners. """ - super().__init__(compact_footer, partition_aware, **kwargs) + super().__init__(compact_footer, partition_aware, event_listeners, **kwargs) def connect(self, *args): """ diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py index ae5587a..f4d38b6 100644 --- a/pyignite/connection/connection.py +++ b/pyignite/connection/connection.py @@ -100,6 +100,8 @@ def _on_handshake_start(self): if logger.isEnabledFor(logging.DEBUG): logger.debug("Connecting to node(address=%s, port=%d) with protocol context %s", self.host, self.port, self.client.protocol_context) + if self._enabled_connection_listener: + self._connection_listener.publish_handshake_start(self.host, self.port, self.protocol_context) def _on_handshake_success(self, result): features = BitmaskFeature.from_array(result.get('features', None)) @@ -110,23 +112,42 @@ def _on_handshake_success(self, result): if logger.isEnabledFor(logging.DEBUG): logger.debug("Connected to node(address=%s, port=%d, node_uuid=%s) with protocol context %s", self.host, self.port, self.uuid, self.client.protocol_context) + if self._enabled_connection_listener: + self._connection_listener.publish_handshake_success(self.host, self.port, self.protocol_context, self.uuid) def _on_handshake_fail(self, err): if isinstance(err, AuthenticationError): logger.error("Authentication failed while connecting to node(address=%s, port=%d): %s", self.host, self.port, err) + if self._enabled_connection_listener: + self._connection_listener.publish_authentication_fail(self.host, self.port, self.protocol_context, err) else: logger.error("Failed to perform handshake, connection to node(address=%s, port=%d) " "with protocol context %s failed: %s", self.host, self.port, self.client.protocol_context, err, exc_info=True) + if self._enabled_connection_listener: + self._connection_listener.publish_handshake_fail(self.host, self.port, self.protocol_context, err) def _on_connection_lost(self, err=None, expected=False): - if expected and logger.isEnabledFor(logging.DEBUG): - logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)", - self.host, self.port, self.uuid) + if expected: + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)", + self.host, self.port, self.uuid) + if self._enabled_connection_listener: + self._connection_listener.publish_connection_closed(self.host, self.port, self.uuid) else: logger.info("Connection lost to node(address=%s, port=%d, node_uuid=%s): %s", self.host, self.port, self.uuid, err) + if self._enabled_connection_listener: + self._connection_listener.publish_connection_lost(self.host, self.port, self.uuid, err) + + @property + def _enabled_connection_listener(self): + return self.client._event_listeners and self.client._event_listeners.enabled_connection_listener + + @property + def _connection_listener(self): + return self.client._event_listeners class Connection(BaseConnection): diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py index 58f509e..ba6d9e4 100644 --- a/pyignite/connection/protocol_context.py +++ b/pyignite/connection/protocol_context.py @@ -44,6 +44,9 @@ def _ensure_consistency(self): if not self.is_feature_flags_supported(): self._features = None + def copy(self): + return ProtocolContext(self.version, self.features) + @property def version(self): return getattr(self, '_version', None) diff --git a/pyignite/monitoring.py b/pyignite/monitoring.py new file mode 100644 index 0000000..7190fc7 --- /dev/null +++ b/pyignite/monitoring.py @@ -0,0 +1,175 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Sequence + + +class _ConnectionEvent: + __slots__ = ('host', 'port') + host: str + port: int + + def __init__(self, host, port): + object.__setattr__(self, 'host', host) + object.__setattr__(self, 'port', port) + + def __repr__(self): + pass + + def __setattr__(self, name, value): + raise TypeError(f'{self.__class__.__name__} is immutable') + + +class _HandshakeEvent(_ConnectionEvent): + __slots__ = ('protocol_context',) + protocol_context: Optional['ProtocolContext'] + + def __init__(self, host, port, protocol_context=None): + super().__init__(host, port) + object.__setattr__(self, 'protocol_context', + protocol_context.copy() if protocol_context else None) + + def __repr__(self): + return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ + f"protocol_context={self.protocol_context})" + + +class HandshakeStartEvent(_HandshakeEvent): + def __init__(self, host, port, protocol_context=None): + super().__init__(host, port, protocol_context) + + +class HandshakeFailedEvent(_HandshakeEvent): + __slots__ = ('error_msg',) + error_msg: str + + def __init__(self, host, port, protocol_context=None, err=None): + super().__init__(host, port, protocol_context) + object.__setattr__(self, 'error_msg', str(err) if err else '') + + def __repr__(self): + return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ + f"protocol_context={self.protocol_context}, error_msg={self.error_msg})" + + +class AuthenticationFailedEvent(HandshakeFailedEvent): + pass + + +class HandshakeSuccessEvent(_HandshakeEvent): + __slots__ = ('node_uuid',) + node_uuid: str + + def __init__(self, host, port, protocol_context, node_uuid): + super().__init__(host, port, protocol_context) + object.__setattr__(self, 'node_uuid', str(node_uuid) if node_uuid else '') + + def __repr__(self): + return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ + f"node_uuid={self.node_uuid}, protocol_context={self.protocol_context})" + + +class ConnectionClosedEvent(_ConnectionEvent): + __slots__ = ('node_uuid',) + node_uuid: str + + def __init__(self, host, port, node_uuid): + super().__init__(host, port) + object.__setattr__(self, 'node_uuid', str(node_uuid) if node_uuid else '') + + def __repr__(self): + return f"{self.__class__.__name__}(host={self.host}, port={self.port}, node_uuid={self.node_uuid})" + + +class ConnectionLostEvent(ConnectionClosedEvent): + __slots__ = ('error_msg',) + node_uuid: str + error_msg: str + + def __init__(self, host, port, node_uuid, err): + super().__init__(host, port, node_uuid) + object.__setattr__(self, 'error_msg', str(err) if err else '') + + def __repr__(self): + return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ + f"node_uuid={self.node_uuid}, error_msg={self.error_msg})" + + +class _EventListener: + pass + + +class ConnectionEventListener(_EventListener): + def on_handshake_start(self, event: HandshakeStartEvent): + pass + + def on_handshake_success(self, event: HandshakeSuccessEvent): + pass + + def on_handshake_fail(self, event: HandshakeFailedEvent): + pass + + def on_authentication_fail(self, event: AuthenticationFailedEvent): + pass + + def on_connection_closed(self, event: ConnectionClosedEvent): + pass + + def on_connection_lost(self, event: ConnectionLostEvent): + pass + + +class _EventListeners: + def __init__(self, listeners: Optional[Sequence]): + self.__connection_listeners = [] + if listeners: + for listener in listeners: + if isinstance(listener, ConnectionEventListener): + self.__connection_listeners.append(listener) + + @property + def enabled_connection_listener(self): + return bool(self.__connection_listeners) + + def publish_handshake_start(self, host, port, protocol_context): + evt = HandshakeStartEvent(host, port, protocol_context) + self.__publish_connection_events(lambda listener: listener.on_handshake_start(evt)) + + def publish_handshake_success(self, host, port, protocol_context, node_uuid): + evt = HandshakeSuccessEvent(host, port, protocol_context, node_uuid) + self.__publish_connection_events(lambda listener: listener.on_handshake_success(evt)) + + def publish_handshake_fail(self, host, port, protocol_context, err): + evt = HandshakeFailedEvent(host, port, protocol_context, err) + self.__publish_connection_events(lambda listener: listener.on_handshake_fail(evt)) + + def publish_authentication_fail(self, host, port, protocol_context, err): + evt = AuthenticationFailedEvent(host, port, protocol_context, err) + self.__publish_connection_events(lambda listener: listener.on_authentication_fail(evt)) + + def publish_connection_closed(self, host, port, node_uuid): + evt = ConnectionClosedEvent(host, port, node_uuid) + self.__publish_connection_events(lambda listener: listener.on_connection_closed(evt)) + + def publish_connection_lost(self, host, port, node_uuid, err): + evt = ConnectionLostEvent(host, port, node_uuid, err) + self.__publish_connection_events(lambda listener: listener.on_connection_lost(evt)) + + def __publish_connection_events(self, callback): + try: + for listener in self.__connection_listeners: + callback(listener) + except: # noqa: 13 + pass diff --git a/tests/security/conftest.py b/tests/security/conftest.py index d5de5a1..8845c31 100644 --- a/tests/security/conftest.py +++ b/tests/security/conftest.py @@ -16,6 +16,7 @@ import pytest +from pyignite import monitoring from tests.util import get_test_dir @@ -47,3 +48,26 @@ def __create_ssl_param(with_password=False): 'ssl_certfile': cert, 'ssl_ca_certfile': cert } + + +class AccumulatingConnectionListener(monitoring.ConnectionEventListener): + def __init__(self): + self.events = [] + + def on_handshake_start(self, event): + self.events.append(event) + + def on_handshake_success(self, event): + self.events.append(event) + + def on_handshake_fail(self, event): + self.events.append(event) + + def on_authentication_fail(self, event): + self.events.append(event) + + def on_connection_closed(self, event): + self.events.append(event) + + def on_connection_lost(self, event): + self.events.append(event) diff --git a/tests/security/test_auth.py b/tests/security/test_auth.py index 3586c91..e0dcaa4 100644 --- a/tests/security/test_auth.py +++ b/tests/security/test_auth.py @@ -18,7 +18,9 @@ import pytest from pyignite import Client, AioClient +from pyignite import monitoring from pyignite.exceptions import AuthenticationError +from tests.security.conftest import AccumulatingConnectionListener from tests.util import start_ignite_gen, clear_ignite_work_dir DEFAULT_IGNITE_USERNAME = 'ignite' @@ -44,23 +46,29 @@ def cleanup(): def test_auth_success(with_ssl, ssl_params, caplog): ssl_params['use_ssl'] = with_ssl - client = Client(username=DEFAULT_IGNITE_USERNAME, password=DEFAULT_IGNITE_PASSWORD, **ssl_params) + listener = AccumulatingConnectionListener() + client = Client(username=DEFAULT_IGNITE_USERNAME, password=DEFAULT_IGNITE_PASSWORD, + event_listeners=[listener], **ssl_params) with caplog.at_level(logger='pyignite', level=logging.DEBUG): with client.connect("127.0.0.1", 10801): assert all(node.alive for node in client._nodes) __assert_successful_connect_log(caplog) + __assert_successful_connect_events(listener) @pytest.mark.asyncio async def test_auth_success_async(with_ssl, ssl_params, caplog): ssl_params['use_ssl'] = with_ssl - client = AioClient(username=DEFAULT_IGNITE_USERNAME, password=DEFAULT_IGNITE_PASSWORD, **ssl_params) + listener = AccumulatingConnectionListener() + client = AioClient(username=DEFAULT_IGNITE_USERNAME, password=DEFAULT_IGNITE_PASSWORD, + event_listeners=[listener], **ssl_params) with caplog.at_level(logger='pyignite', level=logging.DEBUG): async with client.connect("127.0.0.1", 10801): assert all(node.alive for node in client._nodes) __assert_successful_connect_log(caplog) + __assert_successful_connect_events(listener) def __assert_successful_connect_log(caplog): @@ -70,6 +78,19 @@ def __assert_successful_connect_log(caplog): for r in caplog.records) +def __assert_successful_connect_events(listener): + event_classes = (monitoring.HandshakeStartEvent, monitoring.HandshakeSuccessEvent, + monitoring.ConnectionClosedEvent) + + for cls in event_classes: + any(isinstance(ev, cls) for ev in listener.events) + + for ev in listener.events: + if isinstance(ev, event_classes): + assert ev.host == "127.0.0.1" + assert ev.port == 10801 + + auth_failed_params = [ [DEFAULT_IGNITE_USERNAME, None], ['invalid_user', 'invalid_password'], @@ -83,13 +104,15 @@ def __assert_successful_connect_log(caplog): ) def test_auth_failed(username, password, with_ssl, ssl_params, caplog): ssl_params['use_ssl'] = with_ssl - + listener = AccumulatingConnectionListener() with pytest.raises(AuthenticationError): - client = Client(username=username, password=password, **ssl_params) + client = Client(username=username, password=password, + event_listeners=[listener], **ssl_params) with client.connect("127.0.0.1", 10801): pass - __assert_auth_failed_log(caplog) + __assert_auth_failed_log(caplog) + __assert_auth_failed_listener(listener) @pytest.mark.parametrize( @@ -99,15 +122,28 @@ def test_auth_failed(username, password, with_ssl, ssl_params, caplog): @pytest.mark.asyncio async def test_auth_failed_async(username, password, with_ssl, ssl_params, caplog): ssl_params['use_ssl'] = with_ssl - + listener = AccumulatingConnectionListener() with pytest.raises(AuthenticationError): - client = AioClient(username=username, password=password, **ssl_params) + client = AioClient(username=username, password=password, + event_listeners=[listener], **ssl_params) async with client.connect("127.0.0.1", 10801): pass - __assert_auth_failed_log(caplog) + __assert_auth_failed_log(caplog) + __assert_auth_failed_listener(listener) def __assert_auth_failed_log(caplog): pattern = r'Authentication failed while connecting to node\(address=127.0.0.1,\s+port=10801' - assert any(re.match(pattern, r.message) and r.levelname == logging.ERROR for r in caplog.records) + assert any(re.match(pattern, r.message) and r.levelname == logging.getLevelName(logging.ERROR) + for r in caplog.records) + + +def __assert_auth_failed_listener(listener): + found = False + for ev in listener.events: + if isinstance(ev, monitoring.AuthenticationFailedEvent): + found = True + assert ev.host == '127.0.0.1' + assert ev.port == 10801 + assert found diff --git a/tests/security/test_ssl.py b/tests/security/test_ssl.py index 2cbed4b..7a80871 100644 --- a/tests/security/test_ssl.py +++ b/tests/security/test_ssl.py @@ -17,8 +17,9 @@ import pytest -from pyignite import Client, AioClient +from pyignite import Client, AioClient, monitoring from pyignite.exceptions import ReconnectError +from tests.security.conftest import AccumulatingConnectionListener from tests.util import start_ignite_gen, get_or_create_cache, get_or_create_cache_async @@ -76,25 +77,40 @@ async def inner_async(): @pytest.mark.parametrize('invalid_ssl_params', invalid_params) def test_connection_error_with_incorrect_config(invalid_ssl_params, caplog): + listener = AccumulatingConnectionListener() with pytest.raises(ReconnectError): - client = Client(**invalid_ssl_params) + client = Client(event_listeners=[listener], **invalid_ssl_params) with client.connect([("127.0.0.1", 10801)]): pass - __assert_handshake_failed_log(caplog) + __assert_handshake_failed_log(caplog) + __assert_handshake_failed_listener(listener) @pytest.mark.parametrize('invalid_ssl_params', invalid_params) @pytest.mark.asyncio async def test_connection_error_with_incorrect_config_async(invalid_ssl_params, caplog): + listener = AccumulatingConnectionListener() with pytest.raises(ReconnectError): - client = AioClient(**invalid_ssl_params) + client = AioClient(event_listeners=[listener], **invalid_ssl_params) async with client.connect([("127.0.0.1", 10801)]): pass - __assert_handshake_failed_log(caplog) + __assert_handshake_failed_log(caplog) + __assert_handshake_failed_listener(listener) def __assert_handshake_failed_log(caplog): pattern = r'Failed to perform handshake, connection to node\(address=127.0.0.1,\s+port=10801.*failed:' - assert any(re.match(pattern, r.message) and r.levelname == logging.ERROR for r in caplog.records) + assert any(re.match(pattern, r.message) and r.levelname == logging.getLevelName(logging.ERROR) + for r in caplog.records) + + +def __assert_handshake_failed_listener(listener): + found = False + for ev in listener.events: + if isinstance(ev, monitoring.HandshakeFailedEvent): + found = True + assert ev.host == '127.0.0.1' + assert ev.port == 10801 + assert found From ed8b1db1499da2fe0cb9096a07a5df815229fb48 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Thu, 15 Jul 2021 16:06:39 +0300 Subject: [PATCH 2/6] IGNITE-15102 Implement query listener and tests. --- pyignite/aio_client.py | 7 +- pyignite/client.py | 9 +- pyignite/connection/aio_connection.py | 2 +- pyignite/connection/connection.py | 17 ++- pyignite/monitoring.py | 143 ++++++++++++++---- pyignite/queries/query.py | 23 ++- .../affinity/test_affinity_request_routing.py | 103 +++++-------- tests/common/test_query_listener.py | 127 ++++++++++++++++ tests/security/test_auth.py | 41 +++-- tests/security/test_ssl.py | 1 + 10 files changed, 347 insertions(+), 126 deletions(-) create mode 100644 tests/common/test_query_listener.py diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py index eb47c53..083c964 100644 --- a/pyignite/aio_client.py +++ b/pyignite/aio_client.py @@ -101,9 +101,8 @@ async def _connect(self, nodes): # do not try to open more nodes self._current_node = i - except connection_errors: - conn.failed = True + pass self._nodes.append(conn) @@ -303,7 +302,7 @@ async def _get_affinity(self, conn: 'AioConnection', caches: Iterable[int]) -> D """ for _ in range(AFFINITY_RETRIES or 1): result = await cache_get_node_partitions_async(conn, caches) - if result.status == 0 and result.value['partition_mapping']: + if result.status == 0: break await asyncio.sleep(AFFINITY_DELAY) @@ -343,7 +342,7 @@ async def get_best_node( asyncio.ensure_future( asyncio.gather( - *[conn.reconnect() for conn in self._nodes if not conn.alive], + *[node.reconnect() for node in self._nodes if not node.alive], return_exceptions=True ) ) diff --git a/pyignite/client.py b/pyignite/client.py index 3be7a8f..e3dd71b 100644 --- a/pyignite/client.py +++ b/pyignite/client.py @@ -387,7 +387,6 @@ def _connect(self, nodes): self._current_node = i except connection_errors: - conn.failed = True if self.partition_aware: # schedule the reconnection conn.reconnect() @@ -570,7 +569,7 @@ def _get_affinity(self, conn: 'Connection', caches: Iterable[int]) -> Dict: """ for _ in range(AFFINITY_RETRIES or 1): result = cache_get_node_partitions(conn, caches) - if result.status == 0 and result.value['partition_mapping']: + if result.status == 0: break time.sleep(AFFINITY_DELAY) @@ -613,9 +612,9 @@ def get_best_node( self._update_affinity(full_affinity) - for conn in self._nodes: - if not conn.alive: - conn.reconnect() + for node in self._nodes: + if not node.alive: + node.reconnect() c_id = cache.cache_id if isinstance(cache, BaseCache) else cache_id(cache) parts = self._cache_partition_mapping(c_id).get('number_of_partitions') diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py index c5fa24d..89de49d 100644 --- a/pyignite/connection/aio_connection.py +++ b/pyignite/connection/aio_connection.py @@ -190,10 +190,10 @@ async def _connect(self): self._on_handshake_fail(e) raise e except Exception as e: + self._on_handshake_fail(e) # restore undefined protocol version if detecting_protocol: self.client.protocol_context = None - self._on_handshake_fail(e) raise e self._on_handshake_success(result) diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py index f4d38b6..9e90bb8 100644 --- a/pyignite/connection/connection.py +++ b/pyignite/connection/connection.py @@ -99,7 +99,7 @@ def _process_handshake_error(self, response): def _on_handshake_start(self): if logger.isEnabledFor(logging.DEBUG): logger.debug("Connecting to node(address=%s, port=%d) with protocol context %s", - self.host, self.port, self.client.protocol_context) + self.host, self.port, self.protocol_context) if self._enabled_connection_listener: self._connection_listener.publish_handshake_start(self.host, self.port, self.protocol_context) @@ -111,11 +111,13 @@ def _on_handshake_success(self, result): if logger.isEnabledFor(logging.DEBUG): logger.debug("Connected to node(address=%s, port=%d, node_uuid=%s) with protocol context %s", - self.host, self.port, self.uuid, self.client.protocol_context) + self.host, self.port, self.uuid, self.protocol_context) if self._enabled_connection_listener: self._connection_listener.publish_handshake_success(self.host, self.port, self.protocol_context, self.uuid) def _on_handshake_fail(self, err): + self.failed = True + if isinstance(err, AuthenticationError): logger.error("Authentication failed while connecting to node(address=%s, port=%d): %s", self.host, self.port, err) @@ -124,7 +126,7 @@ def _on_handshake_fail(self, err): else: logger.error("Failed to perform handshake, connection to node(address=%s, port=%d) " "with protocol context %s failed: %s", - self.host, self.port, self.client.protocol_context, err, exc_info=True) + self.host, self.port, self.protocol_context, err, exc_info=True) if self._enabled_connection_listener: self._connection_listener.publish_handshake_fail(self.host, self.port, self.protocol_context, err) @@ -237,10 +239,10 @@ def connect(self): self._on_handshake_fail(e) raise e except Exception as e: + self._on_handshake_fail(e) # restore undefined protocol version if detecting_protocol: self.client.protocol_context = None - self._on_handshake_fail(e) raise e self._on_handshake_success(result) @@ -281,7 +283,7 @@ def reconnect(self): if self.alive: return - self.close() + self.close(on_reconnect=True) # connect and silence the connection errors try: @@ -373,7 +375,7 @@ def recv(self, flags=None, reconnect=True) -> bytearray: return data - def close(self): + def close(self, on_reconnect=False): """ Try to mark socket closed, then unlink it. This is recommended but not required, since sockets are automatically closed when @@ -385,5 +387,6 @@ def close(self): self._socket.close() except connection_errors: pass - self._on_connection_lost(expected=True) + if not on_reconnect: + self._on_connection_lost(expected=True) self._socket = None diff --git a/pyignite/monitoring.py b/pyignite/monitoring.py index 7190fc7..55774bd 100644 --- a/pyignite/monitoring.py +++ b/pyignite/monitoring.py @@ -12,34 +12,37 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from typing import Optional, Sequence -class _ConnectionEvent: - __slots__ = ('host', 'port') - host: str - port: int +class _BaseEvent: + def __init__(self, **kwargs): + if kwargs: + for k, v in kwargs.items(): + object.__setattr__(self, k, v) - def __init__(self, host, port): - object.__setattr__(self, 'host', host) - object.__setattr__(self, 'port', port) + def __setattr__(self, name, value): + raise TypeError(f'{self.__class__.__name__} is immutable') def __repr__(self): pass - def __setattr__(self, name, value): - raise TypeError(f'{self.__class__.__name__} is immutable') + +class _ConnectionEvent(_BaseEvent): + __slots__ = ('host', 'port') + host: str + port: int + + def __init__(self, host, port, **kwargs): + super().__init__(host=host, port=port, **kwargs) class _HandshakeEvent(_ConnectionEvent): __slots__ = ('protocol_context',) protocol_context: Optional['ProtocolContext'] - def __init__(self, host, port, protocol_context=None): - super().__init__(host, port) - object.__setattr__(self, 'protocol_context', - protocol_context.copy() if protocol_context else None) + def __init__(self, host, port, protocol_context=None, **kwargs): + super().__init__(host, port, protocol_context=protocol_context.copy() if protocol_context else None, **kwargs) def __repr__(self): return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ @@ -47,17 +50,16 @@ def __repr__(self): class HandshakeStartEvent(_HandshakeEvent): - def __init__(self, host, port, protocol_context=None): - super().__init__(host, port, protocol_context) + def __init__(self, host, port, protocol_context=None, **kwargs): + super().__init__(host, port, protocol_context, **kwargs) class HandshakeFailedEvent(_HandshakeEvent): __slots__ = ('error_msg',) error_msg: str - def __init__(self, host, port, protocol_context=None, err=None): - super().__init__(host, port, protocol_context) - object.__setattr__(self, 'error_msg', str(err) if err else '') + def __init__(self, host, port, protocol_context=None, err=None, **kwargs): + super().__init__(host, port, protocol_context, error_msg=repr(err) if err else '', **kwargs) def __repr__(self): return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ @@ -72,9 +74,8 @@ class HandshakeSuccessEvent(_HandshakeEvent): __slots__ = ('node_uuid',) node_uuid: str - def __init__(self, host, port, protocol_context, node_uuid): - super().__init__(host, port, protocol_context) - object.__setattr__(self, 'node_uuid', str(node_uuid) if node_uuid else '') + def __init__(self, host, port, protocol_context, node_uuid, **kwargs): + super().__init__(host, port, protocol_context, node_uuid=str(node_uuid) if node_uuid else '', **kwargs) def __repr__(self): return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ @@ -85,9 +86,8 @@ class ConnectionClosedEvent(_ConnectionEvent): __slots__ = ('node_uuid',) node_uuid: str - def __init__(self, host, port, node_uuid): - super().__init__(host, port) - object.__setattr__(self, 'node_uuid', str(node_uuid) if node_uuid else '') + def __init__(self, host, port, node_uuid, **kwargs): + super().__init__(host, port, node_uuid=str(node_uuid) if node_uuid else '', **kwargs) def __repr__(self): return f"{self.__class__.__name__}(host={self.host}, port={self.port}, node_uuid={self.node_uuid})" @@ -98,9 +98,8 @@ class ConnectionLostEvent(ConnectionClosedEvent): node_uuid: str error_msg: str - def __init__(self, host, port, node_uuid, err): - super().__init__(host, port, node_uuid) - object.__setattr__(self, 'error_msg', str(err) if err else '') + def __init__(self, host, port, node_uuid, err, **kwargs): + super().__init__(host, port, node_uuid, error_msg=repr(err) if err else '', **kwargs) def __repr__(self): return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ @@ -131,18 +130,87 @@ def on_connection_lost(self, event: ConnectionLostEvent): pass +class _QueryEvent(_BaseEvent): + __slots__ = ('host', 'port', 'node_uuid', 'query_id', 'op_code', 'op_name') + host: str + port: int + node_uuid: str + query_id: int + op_code: int + op_name: str + + def __init__(self, host, port, node_uuid, query_id, op_code, op_name, **kwargs): + super().__init__(host=host, port=port, node_uuid=str(node_uuid) if node_uuid else '', + query_id=query_id, op_code=op_code, op_name=op_name, **kwargs) + + def __repr__(self): + return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ + f"node_uuid={self.node_uuid}, query_id={self.query_id}, " \ + f"op_code={self.op_code}, op_name={self.op_name})" + + +class QueryStartEvent(_QueryEvent): + pass + + +class QuerySuccessEvent(_QueryEvent): + __slots__ = ('duration', ) + duration: int + + def __init__(self, host, port, node_uuid, query_id, op_code, op_name, duration, **kwargs): + super().__init__(host, port, node_uuid, query_id, op_code, op_name, duration=duration, **kwargs) + + def __repr__(self): + return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ + f"node_uuid={self.node_uuid}, query_id={self.query_id}, " \ + f"op_code={self.op_code}, op_name={self.op_name}, duration={self.duration})" + + +class QueryFailEvent(_QueryEvent): + __slots__ = ('duration', 'err_msg') + duration: int + err_msg: str + + def __init__(self, host, port, node_uuid, query_id, op_code, op_name, duration, err, **kwargs): + super().__init__(host, port, node_uuid, query_id, op_code, op_name, duration=duration, + err_msg=repr(err) if err else '', **kwargs) + + def __repr__(self): + return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \ + f"node_uuid={self.node_uuid}, query_id={self.query_id}, op_code={self.op_code}, " \ + f"op_name={self.op_name}, duration={self.duration}, err_msg={self.err_msg})" + + +class QueryEventListener(_EventListener): + def on_query_start(self, event: QueryStartEvent): + pass + + def on_query_success(self, event: QuerySuccessEvent): + pass + + def on_query_fail(self, event: QueryFailEvent): + pass + + class _EventListeners: def __init__(self, listeners: Optional[Sequence]): self.__connection_listeners = [] + self.__query_listeners = [] if listeners: for listener in listeners: if isinstance(listener, ConnectionEventListener): self.__connection_listeners.append(listener) + elif isinstance(listener, QueryEventListener): + self.__query_listeners.append(listener) @property def enabled_connection_listener(self): return bool(self.__connection_listeners) + @property + def enabled_query_listener(self): + return bool(self.__query_listeners) + def publish_handshake_start(self, host, port, protocol_context): evt = HandshakeStartEvent(host, port, protocol_context) self.__publish_connection_events(lambda listener: listener.on_handshake_start(evt)) @@ -167,9 +235,28 @@ def publish_connection_lost(self, host, port, node_uuid, err): evt = ConnectionLostEvent(host, port, node_uuid, err) self.__publish_connection_events(lambda listener: listener.on_connection_lost(evt)) + def publish_query_start(self, host, port, node_uuid, query_id, op_code, op_name): + evt = QueryStartEvent(host, port, node_uuid, query_id, op_code, op_name) + self.__publish_query_events(lambda listener: listener.on_query_start(evt)) + + def publish_query_success(self, host, port, node_uuid, query_id, op_code, op_name, duration): + evt = QuerySuccessEvent(host, port, node_uuid, query_id, op_code, op_name, duration) + self.__publish_query_events(lambda listener: listener.on_query_success(evt)) + + def publish_query_fail(self, host, port, node_uuid, query_id, op_code, op_name, duration, err): + evt = QueryFailEvent(host, port, node_uuid, query_id, op_code, op_name, duration, err) + self.__publish_query_events(lambda listener: listener.on_query_fail(evt)) + def __publish_connection_events(self, callback): try: for listener in self.__connection_listeners: callback(listener) except: # noqa: 13 pass + + def __publish_query_events(self, callback): + try: + for listener in self.__query_listeners: + callback(listener) + except: # noqa: 13 + pass diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py index 89c354e..c36ba34 100644 --- a/pyignite/queries/query.py +++ b/pyignite/queries/query.py @@ -227,12 +227,25 @@ def __post_process_response(conn, response_struct, response): # build result return APIResult(response) + @staticmethod + def _enabled_query_listener(conn): + client = conn.client + return client._event_listeners and client._event_listeners.enabled_query_listener + + @staticmethod + def _event_listener(conn): + return conn.client._event_listeners + def _on_query_started(self, conn): + self._start_ts = time.monotonic() if logger.isEnabledFor(logging.DEBUG): - self._start_ts = time.monotonic() logger.debug("Start query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s)", self.query_id, _get_op_code_name(self.op_code), conn.host, conn.port, conn.uuid) + if self._enabled_query_listener(conn): + self._event_listener(conn).publish_query_start(conn.host, conn.port, conn.uuid, self.query_id, + self.op_code, _get_op_code_name(self.op_code)) + def _on_query_finished(self, conn, result=None, err=None): if logger.isEnabledFor(logging.DEBUG): dur_ms = _sec_to_millis(time.monotonic() - self._start_ts) @@ -242,10 +255,18 @@ def _on_query_finished(self, conn, result=None, err=None): logger.debug("Failed to perform query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s) " "in %.3f ms: %s", self.query_id, _get_op_code_name(self.op_code), conn.host, conn.port, conn.uuid, dur_ms, err) + if self._enabled_query_listener(conn): + self._event_listener(conn).publish_query_fail(conn.host, conn.port, conn.uuid, self.query_id, + self.op_code, _get_op_code_name(self.op_code), + dur_ms, err) else: logger.debug("Finished query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s) " "successfully in %.3f ms", self.query_id, _get_op_code_name(self.op_code), conn.host, conn.port, conn.uuid, dur_ms) + if self._enabled_query_listener(conn): + self._event_listener(conn).publish_query_success(conn.host, conn.port, conn.uuid, self.query_id, + self.op_code, _get_op_code_name(self.op_code), + dur_ms) class ConfigQuery(Query): diff --git a/tests/affinity/test_affinity_request_routing.py b/tests/affinity/test_affinity_request_routing.py index 0d0ec24..42bb8c1 100644 --- a/tests/affinity/test_affinity_request_routing.py +++ b/tests/affinity/test_affinity_request_routing.py @@ -22,11 +22,10 @@ from pyignite import GenericObjectMeta, AioClient, Client from pyignite.aio_cache import AioCache -from pyignite.connection import Connection, AioConnection -from pyignite.constants import PROTOCOL_BYTE_ORDER from pyignite.datatypes import String, LongObject from pyignite.datatypes.cache_config import CacheMode from pyignite.datatypes.prop_codes import PROP_NAME, PROP_BACKUPS_NUMBER, PROP_CACHE_KEY_CONFIGURATION, PROP_CACHE_MODE +from pyignite.monitoring import QueryEventListener from tests.util import wait_for_condition, wait_for_condition_async, start_ignite, kill_process_tree try: @@ -35,41 +34,37 @@ from async_generator import asynccontextmanager requests = deque() -old_send = Connection.send -old_send_async = AioConnection._send -def patched_send(self, *args, **kwargs): - """Patched send function that push to queue idx of server to which request is routed.""" - buf = args[0] - if buf and len(buf) >= 6: - op_code = int.from_bytes(buf[4:6], byteorder=PROTOCOL_BYTE_ORDER) - # Filter only caches operation. - if 1000 <= op_code < 1100: - requests.append(self.port % 100) - return old_send(self, *args, **kwargs) +class QueryRouteListener(QueryEventListener): + def on_query_start(self, event): + if 1000 <= event.op_code < 1100: + requests.append(event.port % 100) -async def patched_send_async(self, *args, **kwargs): - """Patched send function that push to queue idx of server to which request is routed.""" - buf = args[1] - if buf and len(buf) >= 6: - op_code = int.from_bytes(buf[4:6], byteorder=PROTOCOL_BYTE_ORDER) - # Filter only caches operation. - if 1000 <= op_code < 1100: - requests.append(self.port % 100) - return await old_send_async(self, *args, **kwargs) +client_connection_string = [('127.0.0.1', 10800 + idx) for idx in range(1, 5)] -def setup_function(): - requests.clear() - Connection.send = patched_send - AioConnection._send = patched_send_async +@pytest.fixture +def client(): + client = Client(partition_aware=True, event_listeners=[QueryRouteListener()]) + try: + client.connect(client_connection_string) + yield client + finally: + requests.clear() + client.close() -def teardown_function(): - Connection.send = old_send - AioConnection.send = old_send_async +@pytest.fixture +async def async_client(event_loop): + client = AioClient(partition_aware=True, event_listeners=[QueryRouteListener()]) + try: + await client.connect(client_connection_string) + yield client + finally: + requests.clear() + await client.close() def wait_for_affinity_distribution(cache, key, node_idx, timeout=30): @@ -112,7 +107,8 @@ async def check_grid_idx(): @pytest.mark.parametrize("key,grid_idx", [(1, 1), (2, 2), (3, 3), (4, 1), (5, 1), (6, 2), (11, 1), (13, 1), (19, 1)]) @pytest.mark.parametrize("backups", [0, 1, 2, 3]) -def test_cache_operation_on_primitive_key_routes_request_to_primary_node(request, key, grid_idx, backups, client): +def test_cache_operation_on_primitive_key_routes_request_to_primary_node(request, key, grid_idx, backups, + client): cache = client.get_or_create_cache({ PROP_NAME: request.node.name + str(backups), PROP_BACKUPS_NUMBER: backups, @@ -210,47 +206,24 @@ class AffinityTestType1( assert requests.pop() == grid_idx -client_routed_connection_string = [('127.0.0.1', 10800 + idx) for idx in range(1, 5)] - - -@pytest.fixture -def client_routed(): - client = Client(partition_aware=True) - try: - client.connect(client_routed_connection_string) - yield client - finally: - client.close() - - -@pytest.fixture -def client_routed_cache(client_routed, request): - yield client_routed.get_or_create_cache(request.node.name) - - @pytest.fixture -async def async_client_routed(event_loop): - client = AioClient(partition_aware=True) - try: - await client.connect(client_routed_connection_string) - yield client - finally: - await client.close() +def client_cache(client, request): + yield client.get_or_create_cache(request.node.name) @pytest.fixture -async def async_client_routed_cache(async_client_routed, request): - cache = await async_client_routed.get_or_create_cache(request.node.name) +async def async_client_cache(async_client, request): + cache = await async_client.get_or_create_cache(request.node.name) yield cache -def test_cache_operation_routed_to_new_cluster_node(client_routed_cache): - __perform_cache_operation_routed_to_new_node(client_routed_cache) +def test_cache_operation_routed_to_new_cluster_node(client_cache): + __perform_cache_operation_routed_to_new_node(client_cache) @pytest.mark.asyncio -async def test_cache_operation_routed_to_new_cluster_node_async(async_client_routed_cache): - await __perform_cache_operation_routed_to_new_node(async_client_routed_cache) +async def test_cache_operation_routed_to_new_cluster_node_async(async_client_cache): + await __perform_cache_operation_routed_to_new_node(async_client_cache) def __perform_cache_operation_routed_to_new_node(cache): @@ -423,8 +396,8 @@ async def test_new_registered_cache_affinity_async(async_client): assert requests.pop() == 3 -def test_all_registered_cache_updated_on_new_server(client_routed): - with create_caches(client_routed) as caches: +def test_all_registered_cache_updated_on_new_server(client): + with create_caches(client) as caches: key = 12 test_cache = random.choice(caches) wait_for_affinity_distribution(test_cache, key, 3) @@ -444,8 +417,8 @@ def test_all_registered_cache_updated_on_new_server(client_routed): @pytest.mark.asyncio -async def test_all_registered_cache_updated_on_new_server_async(async_client_routed): - async with create_caches_async(async_client_routed) as caches: +async def test_all_registered_cache_updated_on_new_server_async(async_client): + async with create_caches_async(async_client) as caches: key = 12 test_cache = random.choice(caches) await wait_for_affinity_distribution_async(test_cache, key, 3) diff --git a/tests/common/test_query_listener.py b/tests/common/test_query_listener.py new file mode 100644 index 0000000..afff542 --- /dev/null +++ b/tests/common/test_query_listener.py @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + +from pyignite import Client, AioClient +from pyignite.exceptions import CacheError +from pyignite.monitoring import QueryEventListener, QueryStartEvent, QueryFailEvent, QuerySuccessEvent +from pyignite.queries.op_codes import OP_CACHE_PUT, OP_CACHE_PARTITIONS, OP_CLUSTER_GET_STATE + +events = [] + + +class QueryRouteListener(QueryEventListener): + def on_query_start(self, event): + if event.op_code != OP_CACHE_PARTITIONS: + events.append(event) + + def on_query_fail(self, event): + if event.op_code != OP_CACHE_PARTITIONS: + events.append(event) + + def on_query_success(self, event): + if event.op_code != OP_CACHE_PARTITIONS: + events.append(event) + + +@pytest.fixture +def client(): + client = Client(event_listeners=[QueryRouteListener()]) + try: + client.connect('127.0.0.1', 10801) + yield client + finally: + client.close() + events.clear() + + +@pytest.fixture +async def async_client(event_loop): + client = AioClient(event_listeners=[QueryRouteListener()]) + try: + await client.connect('127.0.0.1', 10801) + yield client + finally: + await client.close() + events.clear() + + +def test_query_fail_events(request, client): + with pytest.raises(CacheError): + cache = client.get_cache(request.node.name) + cache.put(1, 1) + + __assert_fail_events(client) + + +@pytest.mark.asyncio +async def test_query_fail_events_async(request, async_client): + with pytest.raises(CacheError): + cache = await async_client.get_cache(request.node.name) + await cache.put(1, 1) + + __assert_fail_events(async_client) + + +def __assert_fail_events(client): + assert len(events) == 2 + conn = client._nodes[0] + for ev in events: + if isinstance(ev, QueryStartEvent): + assert ev.op_code == OP_CACHE_PUT + assert ev.op_name == 'OP_CACHE_PUT' + assert ev.host == conn.host + assert ev.port == conn.port + assert ev.node_uuid == str(conn.uuid if conn.uuid else '') + + if isinstance(ev, QueryFailEvent): + assert ev.op_code == OP_CACHE_PUT + assert ev.op_name == 'OP_CACHE_PUT' + assert ev.host == conn.host + assert ev.port == conn.port + assert ev.node_uuid == str(conn.uuid if conn.uuid else '') + assert 'Cache does not exist' in ev.err_msg + assert ev.duration > 0 + + +def test_query_success_events(client): + client.get_cluster().get_state() + __assert_success_events(client) + + +@pytest.mark.asyncio +async def test_query_success_events_async(async_client): + await async_client.get_cluster().get_state() + __assert_success_events(async_client) + + +def __assert_success_events(client): + assert len(events) == 2 + conn = client._nodes[0] + for ev in events: + if isinstance(ev, QueryStartEvent): + assert ev.op_code == OP_CLUSTER_GET_STATE + assert ev.op_name == 'OP_CLUSTER_GET_STATE' + assert ev.host == conn.host + assert ev.port == conn.port + assert ev.node_uuid == str(conn.uuid if conn.uuid else '') + + if isinstance(ev, QuerySuccessEvent): + assert ev.op_code == OP_CLUSTER_GET_STATE + assert ev.op_name == 'OP_CLUSTER_GET_STATE' + assert ev.host == conn.host + assert ev.port == conn.port + assert ev.node_uuid == str(conn.uuid if conn.uuid else '') + assert ev.duration > 0 diff --git a/tests/security/test_auth.py b/tests/security/test_auth.py index e0dcaa4..503cf88 100644 --- a/tests/security/test_auth.py +++ b/tests/security/test_auth.py @@ -18,7 +18,9 @@ import pytest from pyignite import Client, AioClient -from pyignite import monitoring +from pyignite.monitoring import ( + HandshakeStartEvent, HandshakeSuccessEvent, ConnectionClosedEvent, AuthenticationFailedEvent +) from pyignite.exceptions import AuthenticationError from tests.security.conftest import AccumulatingConnectionListener from tests.util import start_ignite_gen, clear_ignite_work_dir @@ -52,9 +54,10 @@ def test_auth_success(with_ssl, ssl_params, caplog): with caplog.at_level(logger='pyignite', level=logging.DEBUG): with client.connect("127.0.0.1", 10801): assert all(node.alive for node in client._nodes) + conn = client._nodes[0] - __assert_successful_connect_log(caplog) - __assert_successful_connect_events(listener) + __assert_successful_connect_log(conn, caplog) + __assert_successful_connect_events(conn, listener) @pytest.mark.asyncio @@ -66,29 +69,35 @@ async def test_auth_success_async(with_ssl, ssl_params, caplog): with caplog.at_level(logger='pyignite', level=logging.DEBUG): async with client.connect("127.0.0.1", 10801): assert all(node.alive for node in client._nodes) + conn = client._nodes[0] - __assert_successful_connect_log(caplog) - __assert_successful_connect_events(listener) + __assert_successful_connect_log(conn, caplog) + __assert_successful_connect_events(conn, listener) -def __assert_successful_connect_log(caplog): - assert any(re.match(r'Connecting to node\(address=127.0.0.1,\s+port=10801', r.message) for r in caplog.records) - assert any(re.match(r'Connected to node\(address=127.0.0.1,\s+port=10801', r.message) for r in caplog.records) - assert any(re.match(r'Connection closed to node\(address=127.0.0.1,\s+port=10801', r.message) +def __assert_successful_connect_log(conn, caplog): + assert any(re.match(rf'Connecting to node\(address={conn.host},\s+port={conn.port}', r.message) + for r in caplog.records) + assert any(re.match(rf'Connected to node\(address={conn.host},\s+port={conn.port}', r.message) + for r in caplog.records) + assert any(re.match(rf'Connection closed to node\(address={conn.host},\s+port={conn.port}', r.message) for r in caplog.records) -def __assert_successful_connect_events(listener): - event_classes = (monitoring.HandshakeStartEvent, monitoring.HandshakeSuccessEvent, - monitoring.ConnectionClosedEvent) +def __assert_successful_connect_events(conn, listener): + event_classes = (HandshakeStartEvent, HandshakeSuccessEvent, ConnectionClosedEvent) for cls in event_classes: any(isinstance(ev, cls) for ev in listener.events) for ev in listener.events: if isinstance(ev, event_classes): - assert ev.host == "127.0.0.1" - assert ev.port == 10801 + assert ev.host == conn.host + assert ev.port == conn.port + if isinstance(ev, (HandshakeSuccessEvent, ConnectionClosedEvent)): + assert ev.node_uuid == str(conn.uuid if conn.uuid else '') + if isinstance(ev, HandshakeSuccessEvent): + assert ev.protocol_context auth_failed_params = [ @@ -142,8 +151,10 @@ def __assert_auth_failed_log(caplog): def __assert_auth_failed_listener(listener): found = False for ev in listener.events: - if isinstance(ev, monitoring.AuthenticationFailedEvent): + if isinstance(ev, AuthenticationFailedEvent): found = True assert ev.host == '127.0.0.1' assert ev.port == 10801 + assert ev.protocol_context + assert 'AuthenticationError' in ev.error_msg assert found diff --git a/tests/security/test_ssl.py b/tests/security/test_ssl.py index 7a80871..ed0808b 100644 --- a/tests/security/test_ssl.py +++ b/tests/security/test_ssl.py @@ -113,4 +113,5 @@ def __assert_handshake_failed_listener(listener): found = True assert ev.host == '127.0.0.1' assert ev.port == 10801 + assert ev.error_msg assert found From e2ddd7cc83d8fee7e6ba20a3671b750200b23bb3 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Fri, 16 Jul 2021 14:36:36 +0300 Subject: [PATCH 3/6] IGNITE-15102 Add more tests and documentation. --- docs/modules.rst | 1 + .../pyignite.connection.protocol_context.rst | 20 ++ docs/source/pyignite.connection.rst | 6 + docs/source/pyignite.monitoring.rst | 21 ++ docs/source/pyignite.rst | 1 + pyignite/connection/connection.py | 2 +- pyignite/monitoring.py | 195 ++++++++++++++++++ .../affinity/test_affinity_request_routing.py | 49 +++++ tests/custom/test_connection_events.py | 129 ++++++++++++ 9 files changed, 423 insertions(+), 1 deletion(-) create mode 100644 docs/source/pyignite.connection.protocol_context.rst create mode 100644 docs/source/pyignite.monitoring.rst create mode 100644 tests/custom/test_connection_events.py diff --git a/docs/modules.rst b/docs/modules.rst index 0cce570..bdeec8e 100644 --- a/docs/modules.rst +++ b/docs/modules.rst @@ -31,3 +31,4 @@ of `pyignite`, intended for end users. datatypes/parsers datatypes/cache_props Exceptions + Monitoring and handling events diff --git a/docs/source/pyignite.connection.protocol_context.rst b/docs/source/pyignite.connection.protocol_context.rst new file mode 100644 index 0000000..a5298ba --- /dev/null +++ b/docs/source/pyignite.connection.protocol_context.rst @@ -0,0 +1,20 @@ +.. Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +pyignite.connection.protocol_context package +=========================== + +.. automodule:: pyignite.connection.protocol_context + :members: \ No newline at end of file diff --git a/docs/source/pyignite.connection.rst b/docs/source/pyignite.connection.rst index 90c59db..29c2e57 100644 --- a/docs/source/pyignite.connection.rst +++ b/docs/source/pyignite.connection.rst @@ -20,3 +20,9 @@ pyignite.connection package :members: :undoc-members: :show-inheritance: + +Submodules +---------- + +.. toctree:: + pyignite.connection.protocol_context \ No newline at end of file diff --git a/docs/source/pyignite.monitoring.rst b/docs/source/pyignite.monitoring.rst new file mode 100644 index 0000000..98b137d --- /dev/null +++ b/docs/source/pyignite.monitoring.rst @@ -0,0 +1,21 @@ +.. Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +pyignite.monitoring module +=========================== + +.. automodule:: pyignite.monitoring + :members: + :member-order: bysource diff --git a/docs/source/pyignite.rst b/docs/source/pyignite.rst index 2e52500..7a0744c 100644 --- a/docs/source/pyignite.rst +++ b/docs/source/pyignite.rst @@ -44,4 +44,5 @@ Submodules pyignite.transaction pyignite.cursors pyignite.exceptions + pyignite.monitoring diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py index 9e90bb8..2b9970a 100644 --- a/pyignite/connection/connection.py +++ b/pyignite/connection/connection.py @@ -387,6 +387,6 @@ def close(self, on_reconnect=False): self._socket.close() except connection_errors: pass - if not on_reconnect: + if not on_reconnect and not self.failed: self._on_connection_lost(expected=True) self._socket = None diff --git a/pyignite/monitoring.py b/pyignite/monitoring.py index 55774bd..9bbfd20 100644 --- a/pyignite/monitoring.py +++ b/pyignite/monitoring.py @@ -12,6 +12,53 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +""" Tools to monitor client's events. + +For example, a simple query logger might be implemented like this:: + + import logging + + from pyignite import monitoring + + class QueryLogger(monitoring.QueryEventListener): + + def on_query_start(self, event): + logging.info(f"Query {event.op_name} with query id " + f"{event.query_id} started on server " + f"{event.host}:{event.port}") + + def on_query_fail(self, event): + logging.info(f"Query {event.op_name} with query id " + f"{event.query_id} on server " + f"{event.host}:{event.port} " + f"failed in {event.duration}ms " + f"with error {event.error_msg}") + + def on_query_success(self, event): + logging.info(f"Query {event.op_name} with query id " + f"{event.query_id} on server " \ + f"{event.host}:{event.port} " \ + f"succeeded in {event.duration}ms") + +:class:`~ConnectionEventListener` is also available. + +Event listeners can be registered by passing parameter to :class:`~pyignite.client.Client` or +:class:`~pyignite.aio_client.AioClient` constructor:: + + client = Client(event_listeners=[QueryLogger()]) + with client.connect('127.0.0.1', 10800): + .... + +.. note:: Events are delivered **synchronously**. Application threads block + waiting for event handlers. Care must be taken to ensure that your event handlers are efficient + enough to not adversely affect overall application performance. + +.. note:: Debug logging is also available, standard ``logging`` is used. Just set ``DEBUG`` level to + *pyignite* logger. +| +| +""" from typing import Optional, Sequence @@ -50,15 +97,36 @@ def __repr__(self): class HandshakeStartEvent(_HandshakeEvent): + """ + Published when a handshake started. + + :ivar host: Address of the node to connect, + :ivar port: Port number of the node to connect, + :ivar protocol_context: Client's protocol context. + """ def __init__(self, host, port, protocol_context=None, **kwargs): + """ + This class is not supposed to be constructed by user. + """ super().__init__(host, port, protocol_context, **kwargs) class HandshakeFailedEvent(_HandshakeEvent): + """ + Published when a handshake failed. + + :ivar host: Address of the node to connect, + :ivar port: Port number of the node to connect, + :ivar protocol_context: Client's protocol context, + :ivar error_msg: Error message. + """ __slots__ = ('error_msg',) error_msg: str def __init__(self, host, port, protocol_context=None, err=None, **kwargs): + """ + This class is not supposed to be constructed by user. + """ super().__init__(host, port, protocol_context, error_msg=repr(err) if err else '', **kwargs) def __repr__(self): @@ -67,14 +135,33 @@ def __repr__(self): class AuthenticationFailedEvent(HandshakeFailedEvent): + """ + Published when an authentication is failed. + + :ivar host: Address of the node to connect, + :ivar port: Port number of the node to connect, + :ivar protocol_context: Client protocol context, + :ivar error_msg: Error message. + """ pass class HandshakeSuccessEvent(_HandshakeEvent): + """ + Published when a handshake succeeded. + + :ivar host: Address of the node to connect, + :ivar port: Port number of the node to connect, + :ivar protocol_context: Client's protocol context, + :ivar node_uuid: Node's uuid, string. + """ __slots__ = ('node_uuid',) node_uuid: str def __init__(self, host, port, protocol_context, node_uuid, **kwargs): + """ + This class is not supposed to be constructed by user. + """ super().__init__(host, port, protocol_context, node_uuid=str(node_uuid) if node_uuid else '', **kwargs) def __repr__(self): @@ -83,10 +170,20 @@ def __repr__(self): class ConnectionClosedEvent(_ConnectionEvent): + """ + Published when a connection to the node is expectedly closed. + + :ivar host: Address of node to connect, + :ivar port: Port number of node to connect, + :ivar node_uuid: Node uuid, string. + """ __slots__ = ('node_uuid',) node_uuid: str def __init__(self, host, port, node_uuid, **kwargs): + """ + This class is not supposed to be constructed by user. + """ super().__init__(host, port, node_uuid=str(node_uuid) if node_uuid else '', **kwargs) def __repr__(self): @@ -94,11 +191,22 @@ def __repr__(self): class ConnectionLostEvent(ConnectionClosedEvent): + """ + Published when a connection to the node is lost. + + :ivar host: Address of the node to connect, + :ivar port: Port number of the node to connect, + :ivar node_uuid: Node's uuid, string, + :ivar error_msg: Error message. + """ __slots__ = ('error_msg',) node_uuid: str error_msg: str def __init__(self, host, port, node_uuid, err, **kwargs): + """ + This class is not supposed to be constructed by user. + """ super().__init__(host, port, node_uuid, error_msg=repr(err) if err else '', **kwargs) def __repr__(self): @@ -111,22 +219,55 @@ class _EventListener: class ConnectionEventListener(_EventListener): + """ + Base class for connection event listeners. + """ def on_handshake_start(self, event: HandshakeStartEvent): + """ + Handle handshake start event. + + :param event: Instance of :class:`HandshakeStartEvent`. + """ pass def on_handshake_success(self, event: HandshakeSuccessEvent): + """ + Handle handshake success event. + + :param event: Instance of :class:`HandshakeSuccessEvent`. + """ pass def on_handshake_fail(self, event: HandshakeFailedEvent): + """ + Handle handshake failed event. + + :param event: Instance of :class:`HandshakeFailedEvent`. + """ pass def on_authentication_fail(self, event: AuthenticationFailedEvent): + """ + Handle authentication failed event. + + :param event: Instance of :class:`AuthenticationFailedEvent`. + """ pass def on_connection_closed(self, event: ConnectionClosedEvent): + """ + Handle connection closed event. + + :param event: Instance of :class:`ConnectionClosedEvent`. + """ pass def on_connection_lost(self, event: ConnectionLostEvent): + """ + Handle connection lost event. + + :param event: Instance of :class:`ConnectionLostEvent`. + """ pass @@ -140,6 +281,9 @@ class _QueryEvent(_BaseEvent): op_name: str def __init__(self, host, port, node_uuid, query_id, op_code, op_name, **kwargs): + """ + This class is not supposed to be constructed by user. + """ super().__init__(host=host, port=port, node_uuid=str(node_uuid) if node_uuid else '', query_id=query_id, op_code=op_code, op_name=op_name, **kwargs) @@ -150,10 +294,31 @@ def __repr__(self): class QueryStartEvent(_QueryEvent): + """ + Published when a client's query started. + + :ivar host: Address of the node on which the query is executed, + :ivar port: Port number of the node on which the query is executed, + :ivar node_uuid: Node's uuid, string, + :ivar query_id: Query's id, + :ivar op_code: Operation's id, + :ivar op_name: Operation's name. + """ pass class QuerySuccessEvent(_QueryEvent): + """ + Published when a client's query finished successfully. + + :ivar host: Address of the node on which the query is executed, + :ivar port: Port number of the node on which the query is executed, + :ivar node_uuid: Node's uuid, string, + :ivar query_id: Query's id, + :ivar op_code: Operation's id, + :ivar op_name: Operation's name, + :ivar duration: Query's duration in milliseconds. + """ __slots__ = ('duration', ) duration: int @@ -167,6 +332,18 @@ def __repr__(self): class QueryFailEvent(_QueryEvent): + """ + Published when a client's query failed. + + :ivar host: Address of the node on which the query is executed, + :ivar port: Port number of the node on which the query is executed, + :ivar node_uuid: Node's uuid, string, + :ivar query_id: Query's id, + :ivar op_code: Operation's id, + :ivar op_name: Operation's name, + :ivar duration: Query's duration in milliseconds, + :ivar error_msg: Error message. + """ __slots__ = ('duration', 'err_msg') duration: int err_msg: str @@ -182,13 +359,31 @@ def __repr__(self): class QueryEventListener(_EventListener): + """ + Base class for query event listeners. + """ def on_query_start(self, event: QueryStartEvent): + """ + Handle query start event. + + :param event: Instance of :class:`QueryStartEvent`. + """ pass def on_query_success(self, event: QuerySuccessEvent): + """ + Handle query success event. + + :param event: Instance of :class:`QuerySuccessEvent`. + """ pass def on_query_fail(self, event: QueryFailEvent): + """ + Handle query fail event. + + :param event: Instance of :class:`QueryFailEvent`. + """ pass diff --git a/tests/affinity/test_affinity_request_routing.py b/tests/affinity/test_affinity_request_routing.py index 42bb8c1..b73eff3 100644 --- a/tests/affinity/test_affinity_request_routing.py +++ b/tests/affinity/test_affinity_request_routing.py @@ -301,6 +301,55 @@ async def test_replicated_cache_operation_routed_to_random_node_async(async_repl await verify_random_node(async_replicated_cache) +def test_replicated_cache_operation_not_routed_to_failed_node(replicated_cache): + srv = start_ignite(idx=4) + try: + while True: + replicated_cache.put(1, 1) + + if requests.pop() == 4: + break + + kill_process_tree(srv.pid) + + num_failures = 0 + for i in range(100): + # Request may fail one time, because query can be requested before affinity update or connection + # lost will be detected. + try: + replicated_cache.put(1, 1) + except: # noqa 13 + num_failures += 1 + assert num_failures <= 1, "Expected no more than 1 failure." + finally: + kill_process_tree(srv.pid) + + +@pytest.mark.asyncio +async def test_replicated_cache_operation_not_routed_to_failed_node_async(async_replicated_cache): + srv = start_ignite(idx=4) + try: + while True: + await async_replicated_cache.put(1, 1) + + if requests.pop() == 4: + break + + kill_process_tree(srv.pid) + + num_failures = 0 + for i in range(100): + # Request may fail one time, because query can be requested before affinity update or connection + # lost will be detected. + try: + await async_replicated_cache.put(1, 1) + except: # noqa 13 + num_failures += 1 + assert num_failures <= 1, "Expected no more than 1 failure." + finally: + kill_process_tree(srv.pid) + + def verify_random_node(cache): key = 1 diff --git a/tests/custom/test_connection_events.py b/tests/custom/test_connection_events.py new file mode 100644 index 0000000..bee9395 --- /dev/null +++ b/tests/custom/test_connection_events.py @@ -0,0 +1,129 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import random + +import pytest + +from pyignite import Client, AioClient +from pyignite.monitoring import ConnectionEventListener, ConnectionLostEvent, ConnectionClosedEvent, \ + HandshakeSuccessEvent, HandshakeFailedEvent, HandshakeStartEvent + +from tests.util import start_ignite_gen, kill_process_tree + + +@pytest.fixture(autouse=True) +def server1(): + yield from start_ignite_gen(idx=1) + + +@pytest.fixture(autouse=True) +def server2(): + yield from start_ignite_gen(idx=2) + + +events = [] + + +def teardown_function(): + events.clear() + + +class RecordingConnectionEventListener(ConnectionEventListener): + def on_handshake_start(self, event): + events.append(event) + + def on_handshake_success(self, event): + events.append(event) + + def on_handshake_fail(self, event): + events.append(event) + + def on_authentication_fail(self, event): + events.append(event) + + def on_connection_closed(self, event): + events.append(event) + + def on_connection_lost(self, event): + events.append(event) + + +def test_events(request, server2): + client = Client(event_listeners=[RecordingConnectionEventListener()]) + with client.connect([('127.0.0.1', 10800 + idx) for idx in range(1, 3)]): + protocol_context = client.protocol_context + nodes = {conn.port: conn for conn in client._nodes} + cache = client.get_or_create_cache(request.node.name) + kill_process_tree(server2.pid) + + while True: + try: + cache.put(random.randint(0, 1000), 1) + except: # noqa 13 + pass + + if any(isinstance(e, ConnectionLostEvent) for e in events): + break + + __assert_events(nodes, protocol_context) + + +@pytest.mark.asyncio +async def test_events_async(request, server2): + client = AioClient(event_listeners=[RecordingConnectionEventListener()]) + async with client.connect([('127.0.0.1', 10800 + idx) for idx in range(1, 3)]): + protocol_context = client.protocol_context + nodes = {conn.port: conn for conn in client._nodes} + cache = await client.get_or_create_cache(request.node.name) + kill_process_tree(server2.pid) + + while True: + try: + await cache.put(random.randint(0, 1000), 1) + except: # noqa 13 + pass + + if any(isinstance(e, ConnectionLostEvent) for e in events): + break + + __assert_events(nodes, protocol_context) + + +def __assert_events(nodes, protocol_context): + assert len([e for e in events if isinstance(e, ConnectionLostEvent)]) == 1 + # ConnectionLostEvent is a subclass of ConnectionClosedEvent + assert len([e for e in events if type(e) == ConnectionClosedEvent]) == 1 + assert len([e for e in events if isinstance(e, HandshakeSuccessEvent)]) == 2 + + for ev in events: + assert ev.host == '127.0.0.1' + if isinstance(ev, ConnectionLostEvent): + assert ev.port == 10802 + assert ev.node_uuid == str(nodes[ev.port].uuid) + assert ev.error_msg + elif isinstance(ev, HandshakeStartEvent): + assert ev.protocol_context == protocol_context + assert ev.port in {10801, 10802} + elif isinstance(ev, HandshakeFailedEvent): + assert ev.port == 10802 + assert ev.protocol_context == protocol_context + assert ev.error_msg + elif isinstance(ev, HandshakeSuccessEvent): + assert ev.port in {10801, 10802} + assert ev.node_uuid == str(nodes[ev.port].uuid) + assert ev.protocol_context == protocol_context + elif isinstance(ev, ConnectionClosedEvent): + assert ev.port == 10801 + assert ev.node_uuid == str(nodes[ev.port].uuid) From 934f941f8285376788855a1ccd5618128111a75c Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Fri, 16 Jul 2021 14:49:16 +0300 Subject: [PATCH 4/6] IGNITE-15102 Remove file added by mistake --- pyignite/stream/aio_cluster.py | 53 ---------------------------------- 1 file changed, 53 deletions(-) delete mode 100644 pyignite/stream/aio_cluster.py diff --git a/pyignite/stream/aio_cluster.py b/pyignite/stream/aio_cluster.py deleted file mode 100644 index 8a2f98e..0000000 --- a/pyignite/stream/aio_cluster.py +++ /dev/null @@ -1,53 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -This module contains `AioCluster` that lets you get info and change state of the -whole cluster. -""" -from pyignite import AioClient -from pyignite.api.cluster import cluster_get_state_async, cluster_set_state_async - - -class AioCluster: - """ - Ignite cluster abstraction. Users should never use this class directly, - but construct its instances with - :py:meth:`~pyignite.aio_client.AioClient.get_cluster` method instead. - """ - - def __init__(self, client: 'AioClient'): - self._client = client - - async def get_state(self): - """ - Gets current cluster state. - - :return: Current cluster state. This is one of ClusterState.INACTIVE, - ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. - """ - return await cluster_get_state_async(await self._client.random_node()) - - async def set_state(self, state): - """ - Changes current cluster state to the given. - - Note: Deactivation clears in-memory caches (without persistence) - including the system caches. - - :param state: New cluster state. This is one of ClusterState.INACTIVE, - ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. - """ - return await cluster_set_state_async(await self._client.random_node(), state) From db41a0a0281a37b536280ac2ab2e48eb7b549635 Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Fri, 16 Jul 2021 14:59:56 +0300 Subject: [PATCH 5/6] IGNITE-15102 Fix formatting of debug msg --- pyignite/queries/query.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py index c36ba34..c141b26 100644 --- a/pyignite/queries/query.py +++ b/pyignite/queries/query.py @@ -253,7 +253,7 @@ def _on_query_finished(self, conn, result=None, err=None): err = result.message if err: logger.debug("Failed to perform query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s) " - "in %.3f ms: %s", self.query_id, _get_op_code_name(self.op_code), + "in %d ms: %s", self.query_id, _get_op_code_name(self.op_code), conn.host, conn.port, conn.uuid, dur_ms, err) if self._enabled_query_listener(conn): self._event_listener(conn).publish_query_fail(conn.host, conn.port, conn.uuid, self.query_id, @@ -261,7 +261,7 @@ def _on_query_finished(self, conn, result=None, err=None): dur_ms, err) else: logger.debug("Finished query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s) " - "successfully in %.3f ms", self.query_id, _get_op_code_name(self.op_code), + "successfully in %d ms", self.query_id, _get_op_code_name(self.op_code), conn.host, conn.port, conn.uuid, dur_ms) if self._enabled_query_listener(conn): self._event_listener(conn).publish_query_success(conn.host, conn.port, conn.uuid, self.query_id, From 345b805e729edeecfed6e7ba08275be3eb0a8dce Mon Sep 17 00:00:00 2001 From: Ivan Daschinsky Date: Mon, 19 Jul 2021 09:54:22 +0300 Subject: [PATCH 6/6] IGNITE-15102 Add wait for affinity distribution ready for cache. --- tests/affinity/test_affinity.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/affinity/test_affinity.py b/tests/affinity/test_affinity.py index 3097991..c9a6b60 100644 --- a/tests/affinity/test_affinity.py +++ b/tests/affinity/test_affinity.py @@ -36,7 +36,6 @@ def test_get_node_partitions(client, caches): cache_ids = [cache.cache_id for cache in caches] - __wait_for_ready_affinity(client, cache_ids) mappings = __get_mappings(client, cache_ids) __check_mappings(mappings, cache_ids) @@ -44,7 +43,6 @@ def test_get_node_partitions(client, caches): @pytest.mark.asyncio async def test_get_node_partitions_async(async_client, async_caches): cache_ids = [cache.cache_id for cache in async_caches] - await __wait_for_ready_affinity(async_client, cache_ids) mappings = await __get_mappings(async_client, cache_ids) __check_mappings(mappings, cache_ids) @@ -157,6 +155,7 @@ def inner(): caches = [] try: caches = generate_caches() + __wait_for_ready_affinity(client, [cache.cache_id for cache in caches]) yield caches finally: for cache in caches: @@ -166,6 +165,7 @@ async def inner_async(): caches = [] try: caches = await generate_caches() + await __wait_for_ready_affinity(client, [cache.cache_id for cache in caches]) yield caches finally: await asyncio.gather(*[cache.destroy() for cache in caches]) @@ -180,6 +180,7 @@ def cache(client): PROP_CACHE_MODE: CacheMode.PARTITIONED, }) try: + __wait_for_ready_affinity(client, [cache.cache_id]) yield cache finally: cache.destroy() @@ -192,6 +193,7 @@ async def async_cache(async_client): PROP_CACHE_MODE: CacheMode.PARTITIONED, }) try: + await __wait_for_ready_affinity(async_client, [cache.cache_id]) yield cache finally: await cache.destroy()