From db9d5e92442e8198ac91392dbd4cb7676965291e Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 01:51:04 +0300 Subject: [PATCH 01/28] IGNITE-14465 Add the ability to set and get cluster state --- pyignite/aio_client.py | 8 +++ pyignite/api/cluster.py | 100 ++++++++++++++++++++++++++++ pyignite/client.py | 9 +++ pyignite/cluster.py | 53 +++++++++++++++ pyignite/datatypes/cluster_state.py | 28 ++++++++ pyignite/queries/op_codes.py | 5 +- pyignite/stream/aio_cluster.py | 53 +++++++++++++++ 7 files changed, 255 insertions(+), 1 deletion(-) create mode 100644 pyignite/api/cluster.py create mode 100644 pyignite/cluster.py create mode 100644 pyignite/datatypes/cluster_state.py create mode 100644 pyignite/stream/aio_cluster.py diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py index 5e64450..f260e39 100644 --- a/pyignite/aio_client.py +++ b/pyignite/aio_client.py @@ -460,3 +460,11 @@ def sql( return AioSqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type, distributed_joins, local, replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows, timeout) + + def get_cluster(self) -> 'AioCluster': + """ + Gets client cluster facade. + + :return: AioClient cluster facade. + """ + return AioCluster(self) diff --git a/pyignite/api/cluster.py b/pyignite/api/cluster.py new file mode 100644 index 0000000..f1555e7 --- /dev/null +++ b/pyignite/api/cluster.py @@ -0,0 +1,100 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pyignite.api import APIResult +from pyignite.connection import AioConnection, Connection +from pyignite.datatypes import Byte +from pyignite.queries import Query, query_perform +from pyignite.queries.op_codes import OP_CLUSTER_GET_STATE, OP_CLUSTER_CHANGE_STATE + + +def cluster_get_state(connection: 'Connection', query_id=None) -> 'APIResult': + """ + Get cluster state. + + :param connection: Connection to use, + :param query_id: (optional) a value generated by client and returned as-is + in response.query_id. When the parameter is omitted, a random value + is generated, + :return: API result data object. Contains zero status and a state + retrieved on success, non-zero status and an error description on failure. + """ + return __cluster_get_state(connection, query_id) + + +async def cluster_get_state_async(connection: 'AioConnection', query_id=None) -> 'APIResult': + """ + Async version of cluster_get_state + """ + return await __cluster_get_state(connection, query_id) + + +def __post_process_get_state(result): + if result.status == 0: + result.value = result.value['state'] + return result + + +def __cluster_get_state(connection, query_id): + query_struct = Query(OP_CLUSTER_GET_STATE, query_id=query_id) + return query_perform( + query_struct, connection, + response_config=[('state', Byte)], + post_process_fun=__post_process_get_state + ) + + +def cluster_set_state(connection: 'Connection', state: int, query_id=None) -> 'APIResult': + """ + Set cluster state. + + :param connection: Connection to use, + :param state: State to set, + :param query_id: (optional) a value generated by client and returned as-is + in response.query_id. When the parameter is omitted, a random value + is generated, + :return: API result data object. Contains zero status if a value + is written, non-zero status and an error description otherwise. + """ + return __cluster_set_state(connection, state, query_id) + + +async def cluster_set_state_async(connection: 'AioConnection', state: int, query_id=None) -> 'APIResult': + """ + Async version of cluster_get_state + """ + return await __cluster_set_state(connection, state, query_id) + + +def __post_process_set_state(result): + if result.status == 0: + result.value = result.value['state'] + return result + + +def __cluster_set_state(connection, state, query_id): + query_struct = Query( + OP_CLUSTER_CHANGE_STATE, + [ + ('state', Byte) + ], + query_id=query_id + ) + return query_perform( + query_struct, connection, + query_params={ + 'state': state, + } + ) diff --git a/pyignite/client.py b/pyignite/client.py index 2f24c43..38f32f6 100644 --- a/pyignite/client.py +++ b/pyignite/client.py @@ -49,6 +49,7 @@ from .api import cache_get_node_partitions from .api.binary import get_binary_type, put_binary_type from .api.cache_config import cache_get_names +from .cluster import Cluster from .cursors import SqlFieldsCursor from .cache import Cache, create_cache, get_cache, get_or_create_cache, BaseCache from .connection import Connection @@ -727,3 +728,11 @@ def sql( return SqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type, distributed_joins, local, replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows, timeout) + + def get_cluster(self) -> 'Cluster': + """ + Gets client cluster facade. + + :return: Client cluster facade. + """ + return Cluster(self) diff --git a/pyignite/cluster.py b/pyignite/cluster.py new file mode 100644 index 0000000..09bd185 --- /dev/null +++ b/pyignite/cluster.py @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module contains `Cluster` that lets you get info and change state of the +whole cluster. +""" +from pyignite import Client +from pyignite.api.cluster import cluster_get_state, cluster_set_state + + +class Cluster: + """ + Ignite cluster abstraction. Users should never use this class directly, + but construct its instances with + :py:meth:`~pyignite.client.Client.get_cluster` method instead. + """ + + def __init__(self, client: 'Client'): + self._client = client + + def get_state(self): + """ + Gets current cluster state. + + :return: Current cluster state. This is one of ClusterState.INACTIVE, + ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. + """ + return cluster_get_state(self._client.random_node) + + def set_state(self, state): + """ + Changes current cluster state to the given. + + Note: Deactivation clears in-memory caches (without persistence) + including the system caches. + + :param state: New cluster state. This is one of ClusterState.INACTIVE, + ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. + """ + return cluster_set_state(self._client.random_node, state) diff --git a/pyignite/datatypes/cluster_state.py b/pyignite/datatypes/cluster_state.py new file mode 100644 index 0000000..863a1d2 --- /dev/null +++ b/pyignite/datatypes/cluster_state.py @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import IntEnum + + +class ClusterState(IntEnum): + #: Cluster deactivated. Cache operations aren't allowed. + INACTIVE = 0 + + #: Cluster activated. All cache operations are allowed. + ACTIVE = 1 + + #: Cluster activated. Cache read operation allowed, Cache data change operation + #: aren't allowed. + ACTIVE_READ_ONLY = 2 diff --git a/pyignite/queries/op_codes.py b/pyignite/queries/op_codes.py index 7372713..c152f7c 100644 --- a/pyignite/queries/op_codes.py +++ b/pyignite/queries/op_codes.py @@ -61,7 +61,10 @@ OP_QUERY_SQL_FIELDS = 2004 OP_QUERY_SQL_FIELDS_CURSOR_GET_PAGE = 2005 -P_GET_BINARY_TYPE_NAME = 3000 +OP_GET_BINARY_TYPE_NAME = 3000 OP_REGISTER_BINARY_TYPE_NAME = 3001 OP_GET_BINARY_TYPE = 3002 OP_PUT_BINARY_TYPE = 3003 + +OP_CLUSTER_GET_STATE = 5000 +OP_CLUSTER_CHANGE_STATE = 5001 diff --git a/pyignite/stream/aio_cluster.py b/pyignite/stream/aio_cluster.py new file mode 100644 index 0000000..8a2f98e --- /dev/null +++ b/pyignite/stream/aio_cluster.py @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module contains `AioCluster` that lets you get info and change state of the +whole cluster. +""" +from pyignite import AioClient +from pyignite.api.cluster import cluster_get_state_async, cluster_set_state_async + + +class AioCluster: + """ + Ignite cluster abstraction. Users should never use this class directly, + but construct its instances with + :py:meth:`~pyignite.aio_client.AioClient.get_cluster` method instead. + """ + + def __init__(self, client: 'AioClient'): + self._client = client + + async def get_state(self): + """ + Gets current cluster state. + + :return: Current cluster state. This is one of ClusterState.INACTIVE, + ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. + """ + return await cluster_get_state_async(await self._client.random_node()) + + async def set_state(self, state): + """ + Changes current cluster state to the given. + + Note: Deactivation clears in-memory caches (without persistence) + including the system caches. + + :param state: New cluster state. This is one of ClusterState.INACTIVE, + ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. + """ + return await cluster_set_state_async(await self._client.random_node(), state) From c0fed2515499a42e322592bf762c83af3a81c663 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 03:45:54 +0300 Subject: [PATCH 02/28] IGNITE-14465: Protocol context introduced New protocol versions supported --- pyignite/aio_client.py | 4 +- pyignite/api/cluster.py | 6 +++ pyignite/client.py | 24 +++++------ pyignite/connection/aio_connection.py | 16 ++++--- pyignite/connection/bitmask_feature.py | 57 +++++++++++++++++++++++++ pyignite/connection/connection.py | 42 ++++++------------ pyignite/connection/handshake.py | 45 +++++++++++++------ pyignite/connection/protocol_context.py | 53 +++++++++++++++++++++++ pyignite/constants.py | 7 ++- pyignite/queries/query.py | 4 +- pyignite/queries/response.py | 7 +-- 11 files changed, 195 insertions(+), 70 deletions(-) create mode 100644 pyignite/connection/bitmask_feature.py create mode 100644 pyignite/connection/protocol_context.py diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py index f260e39..8b99c70 100644 --- a/pyignite/aio_client.py +++ b/pyignite/aio_client.py @@ -92,7 +92,7 @@ async def _connect(self, nodes): if not self.partition_aware: try: - if self.protocol_version is None: + if self.protocol_context is None: # open connection before adding to the pool await conn.connect() @@ -120,7 +120,7 @@ async def _connect(self, nodes): await asyncio.gather(*reconnect_coro, return_exceptions=True) - if self.protocol_version is None: + if self.protocol_context is None: raise ReconnectError('Can not connect.') async def close(self): diff --git a/pyignite/api/cluster.py b/pyignite/api/cluster.py index f1555e7..051432d 100644 --- a/pyignite/api/cluster.py +++ b/pyignite/api/cluster.py @@ -12,10 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import warnings from pyignite.api import APIResult from pyignite.connection import AioConnection, Connection from pyignite.datatypes import Byte +from pyignite.datatypes.cluster_state import ClusterState from pyignite.queries import Query, query_perform from pyignite.queries.op_codes import OP_CLUSTER_GET_STATE, OP_CLUSTER_CHANGE_STATE @@ -85,6 +87,10 @@ def __post_process_set_state(result): def __cluster_set_state(connection, state, query_id): + if state == ClusterState.ACTIVE_READ_ONLY and \ + not connection.protocol_context.if_cluster_api_supported(): + warnings.warn(f'ClusterState.ACTIVE_READ_ONLY is not supported by the cluster', category=RuntimeWarning) + query_struct = Query( OP_CLUSTER_CHANGE_STATE, [ diff --git a/pyignite/client.py b/pyignite/client.py index 38f32f6..107c96c 100644 --- a/pyignite/client.py +++ b/pyignite/client.py @@ -84,24 +84,23 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = False, * self._partition_aware = partition_aware self.affinity_version = (0, 0) self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)} - self._protocol_version = None + self._protocol_context = None @property - def protocol_version(self): + def protocol_context(self): """ - Returns the tuple of major, minor, and revision numbers of the used - thin protocol version, or None, if no connection to the Ignite cluster - was not yet established. + Returns protocol context, or None, if no connection to the Ignite + cluster was not yet established. This method is not a part of the public API. Unless you wish to extend the `pyignite` capabilities (with additional testing, logging, examining connections, et c.) you probably should not use it. """ - return self._protocol_version + return self._protocol_context - @protocol_version.setter - def protocol_version(self, value): - self._protocol_version = value + @protocol_context.setter + def protocol_context(self, value): + self._protocol_context = value @property def partition_aware(self): @@ -109,7 +108,8 @@ def partition_aware(self): @property def partition_awareness_supported_by_protocol(self): - return self.protocol_version is not None and self.protocol_version >= (1, 4, 0) + return self.protocol_context is not None \ + and self.protocol_context.is_partition_awareness_supported() @property def compact_footer(self) -> bool: @@ -380,7 +380,7 @@ def _connect(self, nodes): conn = Connection(self, host, port, **self._connection_args) try: - if self.protocol_version is None or self.partition_aware: + if self.protocol_context is None or self.partition_aware: # open connection before adding to the pool conn.connect() @@ -397,7 +397,7 @@ def _connect(self, nodes): self._nodes.append(conn) - if self.protocol_version is None: + if self.protocol_context is None: raise ReconnectError('Can not connect.') def close(self): diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py index e5c11da..fcf2b4b 100644 --- a/pyignite/connection/aio_connection.py +++ b/pyignite/connection/aio_connection.py @@ -36,9 +36,11 @@ from pyignite.constants import PROTOCOLS, PROTOCOL_BYTE_ORDER from pyignite.exceptions import HandshakeError, SocketError, connection_errors +from .bitmask_feature import all_supported_features from .connection import BaseConnection from .handshake import HandshakeRequest, HandshakeResponse +from .protocol_context import ProtocolContext from .ssl import create_ssl_context from ..stream import AioBinaryStream @@ -112,22 +114,22 @@ async def _connect(self) -> Union[dict, OrderedDict]: detecting_protocol = False # choose highest version first - if self.client.protocol_version is None: + if self.client.protocol_context is None: detecting_protocol = True - self.client.protocol_version = max(PROTOCOLS) + self.client.protocol_context = ProtocolContext(max(PROTOCOLS), all_supported_features()) try: result = await self._connect_version() except HandshakeError as e: if e.expected_version in PROTOCOLS: - self.client.protocol_version = e.expected_version + self.client.protocol_context.version = e.expected_version result = await self._connect_version() else: raise e except connection_errors: # restore undefined protocol version if detecting_protocol: - self.client.protocol_version = None + self.client.protocol_context = None raise # connection is ready for end user @@ -145,10 +147,10 @@ async def _connect_version(self) -> Union[dict, OrderedDict]: ssl_context = create_ssl_context(self.ssl_params) self._reader, self._writer = await asyncio.open_connection(self.host, self.port, ssl=ssl_context) - protocol_version = self.client.protocol_version + protocol_context = self.client.protocol_context hs_request = HandshakeRequest( - protocol_version, + protocol_context, self.username, self.password ) @@ -158,7 +160,7 @@ async def _connect_version(self) -> Union[dict, OrderedDict]: await self._send(stream.getbuffer(), reconnect=False) with AioBinaryStream(self.client, await self._recv(reconnect=False)) as stream: - hs_response = await HandshakeResponse.parse_async(stream, self.protocol_version) + hs_response = await HandshakeResponse.parse_async(stream, self.protocol_context) if hs_response.op_code == 0: self._close() diff --git a/pyignite/connection/bitmask_feature.py b/pyignite/connection/bitmask_feature.py new file mode 100644 index 0000000..8c82044 --- /dev/null +++ b/pyignite/connection/bitmask_feature.py @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from enum import IntEnum + +from pyignite.constants import PROTOCOL_BYTE_ORDER + + +class BitmaskFeature(IntEnum): + CLUSTER_API = 2 + + +def feature_flags_as_bytes(features: [int]) -> bytearray: + """ + Convert feature flags array to bytearray bitmask. + + :param features: Features list, + :return: Bitmask as bytearray. + """ + value = 0 + for feature in features: + value |= (1 << feature) + + bytes_num = max(features) / 8 + 1 + + return bytearray(value.to_bytes(bytes_num, byteorder=PROTOCOL_BYTE_ORDER)) + + +def all_supported_features() -> [int]: + """ + Get all supported features. + + :return: List of supported features. + """ + return [f.value for f in BitmaskFeature] + + +def all_supported_feature_flags_as_bytes() -> bytearray: + """ + Get all supported features as bytearray bitmask. + + :return: Bitmask as bytearray. + """ + return feature_flags_as_bytes(all_supported_features()) diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py index 901cb56..4f1bf9b 100644 --- a/pyignite/connection/connection.py +++ b/pyignite/connection/connection.py @@ -13,29 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - from collections import OrderedDict import socket from typing import Union from pyignite.constants import PROTOCOLS, IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER from pyignite.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError +from .bitmask_feature import all_supported_features from .handshake import HandshakeRequest, HandshakeResponse +from .protocol_context import ProtocolContext from .ssl import wrap, check_ssl_params from ..stream import BinaryStream @@ -83,19 +70,18 @@ def __repr__(self) -> str: return '{}:{}'.format(self.host or '?', self.port or '?') @property - def protocol_version(self): + def protocol_context(self): """ - Returns the tuple of major, minor, and revision numbers of the used - thin protocol version, or None, if no connection to the Ignite cluster - was yet established. + Returns protocol context, or None, if no connection to the Ignite + cluster was yet established. """ - return self.client.protocol_version + return self.client.protocol_context def _process_handshake_error(self, response): error_text = f'Handshake error: {response.message}' # if handshake fails for any reason other than protocol mismatch # (i.e. authentication error), server version is 0.0.0 - protocol_version = self.client.protocol_version + protocol_version = self.client.protocol_context.version server_version = (response.version_major, response.version_minor, response.version_patch) if any(server_version): @@ -180,22 +166,22 @@ def connect(self) -> Union[dict, OrderedDict]: detecting_protocol = False # choose highest version first - if self.client.protocol_version is None: + if self.client.protocol_context is None: detecting_protocol = True - self.client.protocol_version = max(PROTOCOLS) + self.client.protocol_context = ProtocolContext(max(PROTOCOLS), all_supported_features()) try: result = self._connect_version() except HandshakeError as e: if e.expected_version in PROTOCOLS: - self.client.protocol_version = e.expected_version + self.client.protocol_context.version = e.expected_version result = self._connect_version() else: raise e except connection_errors: # restore undefined protocol version if detecting_protocol: - self.client.protocol_version = None + self.client.protocol_context = None raise # connection is ready for end user @@ -214,10 +200,10 @@ def _connect_version(self) -> Union[dict, OrderedDict]: self._socket = wrap(self._socket, self.ssl_params) self._socket.connect((self.host, self.port)) - protocol_version = self.client.protocol_version + protocol_context = self.client.protocol_context hs_request = HandshakeRequest( - protocol_version, + protocol_context, self.username, self.password ) @@ -227,7 +213,7 @@ def _connect_version(self) -> Union[dict, OrderedDict]: self.send(stream.getbuffer(), reconnect=False) with BinaryStream(self.client, self.recv(reconnect=False)) as stream: - hs_response = HandshakeResponse.parse(stream, self.protocol_version) + hs_response = HandshakeResponse.parse(stream, self.protocol_context) if hs_response.op_code == 0: self.close() diff --git a/pyignite/connection/handshake.py b/pyignite/connection/handshake.py index 0b0fe50..c6b0bd1 100644 --- a/pyignite/connection/handshake.py +++ b/pyignite/connection/handshake.py @@ -13,9 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional, Tuple +from typing import Optional -from pyignite.datatypes import Byte, Int, Short, String, UUIDObject +from pyignite.connection.bitmask_feature import feature_flags_as_bytes +from pyignite.connection.protocol_context import ProtocolContext +from pyignite.datatypes import Byte, Int, Short, String, UUIDObject, ByteArrayObject from pyignite.datatypes.internal import Struct from pyignite.stream import READ_BACKWARD @@ -27,10 +29,10 @@ class HandshakeRequest: handshake_struct = None username = None password = None - protocol_version = None + protocol_context = None def __init__( - self, protocol_version: Tuple[int, int, int], + self, protocol_context: 'ProtocolContext', username: Optional[str] = None, password: Optional[str] = None ): fields = [ @@ -41,7 +43,11 @@ def __init__( ('version_patch', Short), ('client_code', Byte), ] - self.protocol_version = protocol_version + self.protocol_context = protocol_context + if self.protocol_context.is_feature_flags_supported(): + fields.extend([ + ('features', ByteArrayObject), + ]) if username and password: self.username = username self.password = password @@ -58,14 +64,20 @@ async def from_python_async(self, stream): await self.handshake_struct.from_python_async(stream, self.__create_handshake_data()) def __create_handshake_data(self): + version = self.protocol_context.version handshake_data = { 'length': 8, 'op_code': OP_HANDSHAKE, - 'version_major': self.protocol_version[0], - 'version_minor': self.protocol_version[1], - 'version_patch': self.protocol_version[2], + 'version_major': version[0], + 'version_minor': version[1], + 'version_patch': version[2], 'client_code': 2, # fixed value defined by protocol } + if self.protocol_context.is_feature_flags_supported(): + handshake_data.update({ + 'features': feature_flags_as_bytes(self.protocol_context.features), + }) + handshake_data['length'] += 1 if self.username and self.password: handshake_data.update({ 'username': self.username, @@ -96,12 +108,12 @@ def __getattr__(self, item): return self.get(item) @classmethod - def parse(cls, stream, protocol_version): + def parse(cls, stream, protocol_context): start_class = cls.__response_start.parse(stream) start = stream.read_ctype(start_class, direction=READ_BACKWARD) data = cls.__response_start.to_python(start) - response_end = cls.__create_response_end(data, protocol_version) + response_end = cls.__create_response_end(data, protocol_context) if response_end: end_class = response_end.parse(stream) end = stream.read_ctype(end_class, direction=READ_BACKWARD) @@ -110,12 +122,12 @@ def parse(cls, stream, protocol_version): return cls(data) @classmethod - async def parse_async(cls, stream, protocol_version): + async def parse_async(cls, stream, protocol_context): start_class = cls.__response_start.parse(stream) start = stream.read_ctype(start_class, direction=READ_BACKWARD) data = await cls.__response_start.to_python_async(start) - response_end = cls.__create_response_end(data, protocol_version) + response_end = cls.__create_response_end(data, protocol_context) if response_end: end_class = await response_end.parse_async(stream) end = stream.read_ctype(end_class, direction=READ_BACKWARD) @@ -124,7 +136,7 @@ async def parse_async(cls, stream, protocol_version): return cls(data) @classmethod - def __create_response_end(cls, start_data, protocol_version): + def __create_response_end(cls, start_data, protocol_context): response_end = None if start_data['op_code'] == 0: response_end = Struct([ @@ -134,7 +146,12 @@ def __create_response_end(cls, start_data, protocol_version): ('message', String), ('client_status', Int) ]) - elif protocol_version >= (1, 4, 0): + elif protocol_context.is_feature_flags_supported(): + response_end = Struct([ + ('features', ByteArrayObject), + ('node_uuid', UUIDObject), + ]) + elif protocol_context.is_partition_awareness_supported(): response_end = Struct([ ('node_uuid', UUIDObject), ]) diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py new file mode 100644 index 0000000..78713f7 --- /dev/null +++ b/pyignite/connection/protocol_context.py @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Tuple + +from pyignite.connection.bitmask_feature import BitmaskFeature + + +class ProtocolContext: + """ + Protocol context. Provides ability to easily check supported supported + protocol features. + """ + + def __init__(self, version: Tuple[int, int, int], features: [int] = None): + self.version = version + self.features = features + + def is_partition_awareness_supported(self) -> bool: + """ + Check whether partition awareness supported by the current protocol. + """ + return self.version >= (1, 4, 0) + + def is_status_flags_supported(self) -> bool: + """ + Check whether status flags supported by the current protocol. + """ + return self.version >= (1, 4, 0) + + def is_feature_flags_supported(self) -> bool: + """ + Check whether feature flags supported by the current protocol. + """ + return self.version >= (1, 7, 0) + + def if_cluster_api_supported(self) -> bool: + """ + Check whether cluster API supported by the current protocol. + """ + return BitmaskFeature.CLUSTER_API in self.features diff --git a/pyignite/constants.py b/pyignite/constants.py index 02f7124..c08a3ce 100644 --- a/pyignite/constants.py +++ b/pyignite/constants.py @@ -31,14 +31,17 @@ ] PROTOCOLS = { + (1, 7, 0), + (1, 6, 0), + (1, 5, 0), (1, 4, 0), (1, 3, 0), (1, 2, 0), } PROTOCOL_VERSION_MAJOR = 1 -PROTOCOL_VERSION_MINOR = 4 -PROTOCOL_VERSION_PATCH = 0 +PROTOCOL_VERSION_MINOR = 7 +PROTOCOL_VERSION_PATCH = 1 MAX_LONG = 9223372036854775807 MIN_LONG = -9223372036854775808 diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py index beea5d9..d9e6aaf 100644 --- a/pyignite/queries/query.py +++ b/pyignite/queries/query.py @@ -124,7 +124,7 @@ def perform( self.from_python(stream, query_params) response_data = conn.request(stream.getbuffer()) - response_struct = self.response_type(protocol_version=conn.protocol_version, + response_struct = self.response_type(protocol_context=conn.protocol_context, following=response_config, **kwargs) with BinaryStream(conn.client, response_data) as stream: @@ -156,7 +156,7 @@ async def perform_async( await self.from_python_async(stream, query_params) data = await conn.request(stream.getbuffer()) - response_struct = self.response_type(protocol_version=conn.protocol_version, + response_struct = self.response_type(protocol_context=conn.protocol_context, following=response_config, **kwargs) with AioBinaryStream(conn.client, data) as stream: diff --git a/pyignite/queries/response.py b/pyignite/queries/response.py index 83a6e6a..2efa517 100644 --- a/pyignite/queries/response.py +++ b/pyignite/queries/response.py @@ -19,6 +19,7 @@ from collections import OrderedDict import ctypes +from pyignite.connection.protocol_context import ProtocolContext from pyignite.constants import RHF_TOPOLOGY_CHANGED, RHF_ERROR from pyignite.datatypes import AnyDataObject, Bool, Int, Long, String, StringArray, Struct from pyignite.datatypes.binary import body_struct, enum_struct, schema_struct @@ -29,7 +30,7 @@ @attr.s class Response: following = attr.ib(type=list, factory=list) - protocol_version = attr.ib(type=tuple, factory=tuple) + protocol_context = attr.ib(type=type(ProtocolContext), default=None) _response_header = None _response_class_name = 'Response' @@ -44,7 +45,7 @@ def __build_header(self): ('query_id', ctypes.c_longlong), ] - if self.protocol_version and self.protocol_version >= (1, 4, 0): + if self.protocol_context and self.protocol_context.is_status_flags_supported(): fields.append(('flags', ctypes.c_short)) else: fields.append(('status_code', ctypes.c_int),) @@ -68,7 +69,7 @@ def __parse_header(self, stream): fields = [] has_error = False - if self.protocol_version and self.protocol_version >= (1, 4, 0): + if self.protocol_context and self.protocol_context.is_status_flags_supported(): if header.flags & RHF_TOPOLOGY_CHANGED: fields = [ ('affinity_version', ctypes.c_longlong), From 76aca48d1640b35da625444284e32c74805aebed Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Thu, 1 Apr 2021 18:23:51 -0700 Subject: [PATCH 03/28] IGNITE-14465: Fix issues --- pyignite/cluster.py | 1 - pyignite/connection/bitmask_feature.py | 2 +- pyignite/connection/handshake.py | 8 ++++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pyignite/cluster.py b/pyignite/cluster.py index 09bd185..3a79f0e 100644 --- a/pyignite/cluster.py +++ b/pyignite/cluster.py @@ -17,7 +17,6 @@ This module contains `Cluster` that lets you get info and change state of the whole cluster. """ -from pyignite import Client from pyignite.api.cluster import cluster_get_state, cluster_set_state diff --git a/pyignite/connection/bitmask_feature.py b/pyignite/connection/bitmask_feature.py index 8c82044..83853b2 100644 --- a/pyignite/connection/bitmask_feature.py +++ b/pyignite/connection/bitmask_feature.py @@ -34,7 +34,7 @@ def feature_flags_as_bytes(features: [int]) -> bytearray: for feature in features: value |= (1 << feature) - bytes_num = max(features) / 8 + 1 + bytes_num = max(features) // 8 + 1 return bytearray(value.to_bytes(bytes_num, byteorder=PROTOCOL_BYTE_ORDER)) diff --git a/pyignite/connection/handshake.py b/pyignite/connection/handshake.py index c6b0bd1..1251583 100644 --- a/pyignite/connection/handshake.py +++ b/pyignite/connection/handshake.py @@ -74,10 +74,14 @@ def __create_handshake_data(self): 'client_code': 2, # fixed value defined by protocol } if self.protocol_context.is_feature_flags_supported(): + features = feature_flags_as_bytes(self.protocol_context.features) handshake_data.update({ - 'features': feature_flags_as_bytes(self.protocol_context.features), + 'features': features, }) - handshake_data['length'] += 1 + handshake_data['length'] += sum([ + 5, + len(features) + ]) if self.username and self.password: handshake_data.update({ 'username': self.username, From b1474827e94f672ea7d51483596786324429e09f Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 04:51:08 +0300 Subject: [PATCH 04/28] IGNITE-14465: Add tests --- tests/custom/conftest.py | 48 +++++++++++++++++++++++++ tests/custom/test_timezone.py | 68 +++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+) create mode 100644 tests/custom/conftest.py create mode 100644 tests/custom/test_timezone.py diff --git a/tests/custom/conftest.py b/tests/custom/conftest.py new file mode 100644 index 0000000..5601886 --- /dev/null +++ b/tests/custom/conftest.py @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from pyignite import Client, AioClient +from tests.util import start_ignite + + +@pytest.fixture(scope='module') +def start_ignite_server(): + def start(idx=1, debug=False, use_ssl=False, use_auth=False): + return start_ignite( + idx=idx, + debug=debug, + use_ssl=use_ssl, + use_auth=use_auth + ) + + return start + + +@pytest.fixture(scope='module') +def start_client(): + def start(**kwargs): + return Client(**kwargs) + + return start + + +@pytest.fixture(scope='module') +def start_async_client(): + def start(**kwargs): + return AioClient(**kwargs) + + return start diff --git a/tests/custom/test_timezone.py b/tests/custom/test_timezone.py new file mode 100644 index 0000000..d2c56f4 --- /dev/null +++ b/tests/custom/test_timezone.py @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from tests.util import kill_process_tree + +from pyignite.datatypes.cluster_state import ClusterState + + +def test_cluster_set_active(start_ignite_server, start_client): + server1 = start_ignite_server(idx=1) + server2 = start_ignite_server(idx=2) + try: + client = start_client() + + with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): + cluster = client.get_cluster() + + assert cluster.get_state() == ClusterState.ACTIVE + + cluster.set_state(ClusterState.ACTIVE_READ_ONLY) + assert cluster.get_state() == ClusterState.ACTIVE_READ_ONLY + + cluster.set_state(ClusterState.INACTIVE) + assert cluster.get_state() == ClusterState.INACTIVE + + cluster.set_state(ClusterState.ACTIVE) + assert cluster.get_state() == ClusterState.ACTIVE + finally: + kill_process_tree(server1.pid) + kill_process_tree(server2.pid) + + +@pytest.mark.asyncio +async def test_cluster_set_active_async(start_ignite_server, start_async_client): + server1 = start_ignite_server(idx=1) + server2 = start_ignite_server(idx=2) + try: + client = await start_async_client() + with await client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): + cluster = await client.get_cluster() + + assert await cluster.get_state() == ClusterState.ACTIVE + + await cluster.set_state(ClusterState.ACTIVE_READ_ONLY) + assert await cluster.get_state() == ClusterState.ACTIVE_READ_ONLY + + await cluster.set_state(ClusterState.INACTIVE) + assert await cluster.get_state() == ClusterState.INACTIVE + + await cluster.set_state(ClusterState.ACTIVE) + assert await cluster.get_state() == ClusterState.ACTIVE + finally: + kill_process_tree(server1.pid) + kill_process_tree(server2.pid) From eda90a482048daf33b328b9611d992b00a6e2f49 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 05:02:16 +0300 Subject: [PATCH 05/28] IGNITE-14465: Fixes --- pyignite/aio_client.py | 1 + pyignite/aio_cluster.py | 56 +++++++++++++++++++ pyignite/cluster.py | 4 ++ pyignite/exceptions.py | 10 +++- .../{test_timezone.py => test_cluster.py} | 0 5 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 pyignite/aio_cluster.py rename tests/custom/{test_timezone.py => test_cluster.py} (100%) diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py index 8b99c70..1870878 100644 --- a/pyignite/aio_client.py +++ b/pyignite/aio_client.py @@ -17,6 +17,7 @@ from itertools import chain from typing import Iterable, Type, Union, Any, Dict +from .aio_cluster import AioCluster from .api import cache_get_node_partitions_async from .api.binary import get_binary_type_async, put_binary_type_async from .api.cache_config import cache_get_names_async diff --git a/pyignite/aio_cluster.py b/pyignite/aio_cluster.py new file mode 100644 index 0000000..effd383 --- /dev/null +++ b/pyignite/aio_cluster.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module contains `AioCluster` that lets you get info and change state of the +whole cluster asynchronously. +""" +from pyignite.api.cluster import cluster_get_state_async, cluster_set_state_async +from pyignite.exceptions import ClusterError +from pyignite.utils import status_to_exception + + +class AioCluster: + """ + Ignite cluster abstraction. Users should never use this class directly, + but construct its instances with + :py:meth:`~pyignite.aio_client.AioClient.get_cluster` method instead. + """ + + def __init__(self, client: 'AioClient'): + self._client = client + + @status_to_exception(ClusterError) + async def get_state(self): + """ + Gets current cluster state. + + :return: Current cluster state. This is one of ClusterState.INACTIVE, + ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. + """ + return await cluster_get_state_async(self._client.random_node) + + @status_to_exception(ClusterError) + async def set_state(self, state): + """ + Changes current cluster state to the given. + + Note: Deactivation clears in-memory caches (without persistence) + including the system caches. + + :param state: New cluster state. This is one of ClusterState.INACTIVE, + ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. + """ + return await cluster_set_state_async(self._client.random_node, state) diff --git a/pyignite/cluster.py b/pyignite/cluster.py index 3a79f0e..f10afe4 100644 --- a/pyignite/cluster.py +++ b/pyignite/cluster.py @@ -18,6 +18,8 @@ whole cluster. """ from pyignite.api.cluster import cluster_get_state, cluster_set_state +from pyignite.exceptions import ClusterError +from pyignite.utils import status_to_exception class Cluster: @@ -30,6 +32,7 @@ class Cluster: def __init__(self, client: 'Client'): self._client = client + @status_to_exception(ClusterError) def get_state(self): """ Gets current cluster state. @@ -39,6 +42,7 @@ def get_state(self): """ return cluster_get_state(self._client.random_node) + @status_to_exception(ClusterError) def set_state(self, state): """ Changes current cluster state to the given. diff --git a/pyignite/exceptions.py b/pyignite/exceptions.py index 579aa29..0919fd4 100644 --- a/pyignite/exceptions.py +++ b/pyignite/exceptions.py @@ -65,7 +65,7 @@ class ParameterError(Exception): class CacheError(Exception): """ - This exception is raised, whenever any remote Thin client operation + This exception is raised, whenever any remote Thin client cache operation returns an error. """ pass @@ -93,4 +93,12 @@ class SQLError(CacheError): pass +class ClusterError(Exception): + """ + This exception is raised, whenever any remote Thin client cluster operation + returns an error. + """ + pass + + connection_errors = (IOError, OSError, EOFError) diff --git a/tests/custom/test_timezone.py b/tests/custom/test_cluster.py similarity index 100% rename from tests/custom/test_timezone.py rename to tests/custom/test_cluster.py From 57b0db9eafd28300796c2466f5265619d74bb898 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Thu, 1 Apr 2021 19:14:21 -0700 Subject: [PATCH 06/28] IGNITE-14465: Fix async tests --- pyignite/aio_cluster.py | 4 ++-- tests/custom/test_cluster.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pyignite/aio_cluster.py b/pyignite/aio_cluster.py index effd383..6d76125 100644 --- a/pyignite/aio_cluster.py +++ b/pyignite/aio_cluster.py @@ -40,7 +40,7 @@ async def get_state(self): :return: Current cluster state. This is one of ClusterState.INACTIVE, ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. """ - return await cluster_get_state_async(self._client.random_node) + return await cluster_get_state_async(await self._client.random_node()) @status_to_exception(ClusterError) async def set_state(self, state): @@ -53,4 +53,4 @@ async def set_state(self, state): :param state: New cluster state. This is one of ClusterState.INACTIVE, ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. """ - return await cluster_set_state_async(self._client.random_node, state) + return await cluster_set_state_async(await self._client.random_node(), state) diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py index d2c56f4..e9bed3b 100644 --- a/tests/custom/test_cluster.py +++ b/tests/custom/test_cluster.py @@ -49,9 +49,9 @@ async def test_cluster_set_active_async(start_ignite_server, start_async_client) server1 = start_ignite_server(idx=1) server2 = start_ignite_server(idx=2) try: - client = await start_async_client() - with await client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): - cluster = await client.get_cluster() + client = start_async_client() + async with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): + cluster = client.get_cluster() assert await cluster.get_state() == ClusterState.ACTIVE From caaa7ecc826b807b485cc8f7325864f96ed2b00e Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Thu, 1 Apr 2021 19:26:50 -0700 Subject: [PATCH 07/28] IGNITE-14465: Improve tests --- tests/custom/test_cluster.py | 44 +++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py index e9bed3b..3594986 100644 --- a/tests/custom/test_cluster.py +++ b/tests/custom/test_cluster.py @@ -15,30 +15,49 @@ import pytest +from pyignite.exceptions import CacheError from tests.util import kill_process_tree from pyignite.datatypes.cluster_state import ClusterState def test_cluster_set_active(start_ignite_server, start_client): + key = 42 + val = 42 + server1 = start_ignite_server(idx=1) server2 = start_ignite_server(idx=2) try: client = start_client() - with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): cluster = client.get_cluster() - assert cluster.get_state() == ClusterState.ACTIVE + cache = client.get_or_create_cache("test_cache") + cache.put(key, val) + assert cache.get(key) == val + cluster.set_state(ClusterState.ACTIVE_READ_ONLY) assert cluster.get_state() == ClusterState.ACTIVE_READ_ONLY + assert cache.get(key) == val + with pytest.raises(CacheError): + cache.put(key, val + 1) + cluster.set_state(ClusterState.INACTIVE) assert cluster.get_state() == ClusterState.INACTIVE + with pytest.raises(CacheError): + cache.get(key) + + with pytest.raises(CacheError): + cache.put(key, val + 1) + cluster.set_state(ClusterState.ACTIVE) assert cluster.get_state() == ClusterState.ACTIVE + + cache.put(key, val + 2) + assert cache.get(key) == val + 2 finally: kill_process_tree(server1.pid) kill_process_tree(server2.pid) @@ -46,23 +65,42 @@ def test_cluster_set_active(start_ignite_server, start_client): @pytest.mark.asyncio async def test_cluster_set_active_async(start_ignite_server, start_async_client): + key = 42 + val = 42 + server1 = start_ignite_server(idx=1) server2 = start_ignite_server(idx=2) try: client = start_async_client() async with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): cluster = client.get_cluster() - assert await cluster.get_state() == ClusterState.ACTIVE + cache = await client.get_or_create_cache("test_cache") + await cache.put(key, val) + assert await cache.get(key) == val + await cluster.set_state(ClusterState.ACTIVE_READ_ONLY) assert await cluster.get_state() == ClusterState.ACTIVE_READ_ONLY + assert await cache.get(key) == val + with pytest.raises(CacheError): + await cache.put(key, val + 1) + await cluster.set_state(ClusterState.INACTIVE) assert await cluster.get_state() == ClusterState.INACTIVE + with pytest.raises(CacheError): + await cache.get(key) + + with pytest.raises(CacheError): + await cache.put(key, val + 1) + await cluster.set_state(ClusterState.ACTIVE) assert await cluster.get_state() == ClusterState.ACTIVE + + await cache.put(key, val + 2) + assert await cache.get(key) == val + 2 finally: kill_process_tree(server1.pid) kill_process_tree(server2.pid) From d15a2fb3318490d90662e3f4f15b68a400934400 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 12:03:22 +0300 Subject: [PATCH 08/28] IGNITE-14465: Fix tests config --- tests/config/ignite-config.xml.jinja2 | 5 ++--- tests/custom/conftest.py | 5 +++-- tests/custom/test_cluster.py | 19 +++++++++++-------- tests/util.py | 14 +++++++++++--- 4 files changed, 27 insertions(+), 16 deletions(-) diff --git a/tests/config/ignite-config.xml.jinja2 b/tests/config/ignite-config.xml.jinja2 index 2bf5129..325a581 100644 --- a/tests/config/ignite-config.xml.jinja2 +++ b/tests/config/ignite-config.xml.jinja2 @@ -31,7 +31,7 @@ - {% if use_auth %} + {% if use_persistence %} {% endif %} @@ -51,9 +51,8 @@ {% endif %} - {% if use_ssl %} - + {% endif %} diff --git a/tests/custom/conftest.py b/tests/custom/conftest.py index 5601886..1d85d4a 100644 --- a/tests/custom/conftest.py +++ b/tests/custom/conftest.py @@ -21,12 +21,13 @@ @pytest.fixture(scope='module') def start_ignite_server(): - def start(idx=1, debug=False, use_ssl=False, use_auth=False): + def start(idx=1, debug=False, use_ssl=False, use_auth=False, use_persistence=False): return start_ignite( idx=idx, debug=debug, use_ssl=use_ssl, - use_auth=use_auth + use_auth=use_auth, + use_persistence=use_persistence, ) return start diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py index 3594986..15e036d 100644 --- a/tests/custom/test_cluster.py +++ b/tests/custom/test_cluster.py @@ -21,12 +21,16 @@ from pyignite.datatypes.cluster_state import ClusterState -def test_cluster_set_active(start_ignite_server, start_client): +@pytest.fixture(params=['with-persistence', 'without-persistence']) +def with_persistence(request): + yield request.param == 'with-persistence' + + +def test_cluster_set_active(start_ignite_server, start_client, with_persistence): key = 42 val = 42 - - server1 = start_ignite_server(idx=1) - server2 = start_ignite_server(idx=2) + server1 = start_ignite_server(idx=1, use_persistence=with_persistence) + server2 = start_ignite_server(idx=2, use_persistence=with_persistence) try: client = start_client() with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): @@ -64,12 +68,11 @@ def test_cluster_set_active(start_ignite_server, start_client): @pytest.mark.asyncio -async def test_cluster_set_active_async(start_ignite_server, start_async_client): +async def test_cluster_set_active_async(start_ignite_server, start_async_client, with_persistence): key = 42 val = 42 - - server1 = start_ignite_server(idx=1) - server2 = start_ignite_server(idx=2) + server1 = start_ignite_server(idx=1, use_persistence=with_persistence) + server2 = start_ignite_server(idx=2, use_persistence=with_persistence) try: client = start_async_client() async with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): diff --git a/tests/util.py b/tests/util.py index 5651739..87283fb 100644 --- a/tests/util.py +++ b/tests/util.py @@ -155,7 +155,7 @@ def create_config_file(tpl_name, file_name, **kwargs): f.write(template.render(**kwargs)) -def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False): +def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False, use_persistence=False): clear_logs(idx) runner = get_ignite_runner() @@ -166,8 +166,16 @@ def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False): env["JVM_OPTS"] = "-Djava.net.preferIPv4Stack=true -Xdebug -Xnoagent -Djava.compiler=NONE " \ "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 " - params = {'ignite_instance_idx': str(idx), 'ignite_client_port': 10800 + idx, 'use_ssl': use_ssl, - 'use_auth': use_auth} + if use_auth: + use_persistence = True + + params = { + 'ignite_instance_idx': str(idx), + 'ignite_client_port': 10800 + idx, + 'use_ssl': use_ssl, + 'use_auth': use_auth, + 'use_persistence': use_persistence, + } create_config_file('log4j.xml.jinja2', f'log4j-{idx}.xml', **params) create_config_file('ignite-config.xml.jinja2', f'ignite-config-{idx}.xml', **params) From a8b509cae01e91876c84476826d6ad9df24aec3c Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 14:43:30 +0300 Subject: [PATCH 09/28] IGNITE-14465: BitmaskFeature(IntEnum -> IntFlag) --- pyignite/connection/aio_connection.py | 4 +- pyignite/connection/bitmask_feature.py | 53 ++++++++++---------------- pyignite/connection/connection.py | 4 +- pyignite/connection/handshake.py | 3 +- 4 files changed, 25 insertions(+), 39 deletions(-) diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py index fcf2b4b..abd5185 100644 --- a/pyignite/connection/aio_connection.py +++ b/pyignite/connection/aio_connection.py @@ -36,7 +36,7 @@ from pyignite.constants import PROTOCOLS, PROTOCOL_BYTE_ORDER from pyignite.exceptions import HandshakeError, SocketError, connection_errors -from .bitmask_feature import all_supported_features +from .bitmask_feature import BitmaskFeature from .connection import BaseConnection from .handshake import HandshakeRequest, HandshakeResponse @@ -116,7 +116,7 @@ async def _connect(self) -> Union[dict, OrderedDict]: # choose highest version first if self.client.protocol_context is None: detecting_protocol = True - self.client.protocol_context = ProtocolContext(max(PROTOCOLS), all_supported_features()) + self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported()) try: result = await self._connect_version() diff --git a/pyignite/connection/bitmask_feature.py b/pyignite/connection/bitmask_feature.py index 83853b2..bec02a8 100644 --- a/pyignite/connection/bitmask_feature.py +++ b/pyignite/connection/bitmask_feature.py @@ -14,44 +14,31 @@ # limitations under the License. -from enum import IntEnum +from enum import IntFlag from pyignite.constants import PROTOCOL_BYTE_ORDER -class BitmaskFeature(IntEnum): - CLUSTER_API = 2 +class BitmaskFeature(IntFlag): + CLUSTER_API = 1 << 2 + def to_bytearray(self) -> bytearray: + """ + Convert feature flags array to bytearray bitmask. -def feature_flags_as_bytes(features: [int]) -> bytearray: - """ - Convert feature flags array to bytearray bitmask. + :return: Bitmask as bytearray. + """ + full_bytes = self.bit_length() // 8 + 1 + return bytearray(self.to_bytes(full_bytes, byteorder=PROTOCOL_BYTE_ORDER)) - :param features: Features list, - :return: Bitmask as bytearray. - """ - value = 0 - for feature in features: - value |= (1 << feature) + @staticmethod + def all_supported() -> 'BitmaskFeature': + """ + Get all supported features. - bytes_num = max(features) // 8 + 1 - - return bytearray(value.to_bytes(bytes_num, byteorder=PROTOCOL_BYTE_ORDER)) - - -def all_supported_features() -> [int]: - """ - Get all supported features. - - :return: List of supported features. - """ - return [f.value for f in BitmaskFeature] - - -def all_supported_feature_flags_as_bytes() -> bytearray: - """ - Get all supported features as bytearray bitmask. - - :return: Bitmask as bytearray. - """ - return feature_flags_as_bytes(all_supported_features()) + :return: All supported features. + """ + supported = BitmaskFeature(0) + for feature in BitmaskFeature: + supported |= feature + return supported diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py index 4f1bf9b..f766059 100644 --- a/pyignite/connection/connection.py +++ b/pyignite/connection/connection.py @@ -19,7 +19,7 @@ from pyignite.constants import PROTOCOLS, IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER from pyignite.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError -from .bitmask_feature import all_supported_features +from .bitmask_feature import BitmaskFeature from .handshake import HandshakeRequest, HandshakeResponse from .protocol_context import ProtocolContext @@ -168,7 +168,7 @@ def connect(self) -> Union[dict, OrderedDict]: # choose highest version first if self.client.protocol_context is None: detecting_protocol = True - self.client.protocol_context = ProtocolContext(max(PROTOCOLS), all_supported_features()) + self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported()) try: result = self._connect_version() diff --git a/pyignite/connection/handshake.py b/pyignite/connection/handshake.py index 1251583..ad5aaee 100644 --- a/pyignite/connection/handshake.py +++ b/pyignite/connection/handshake.py @@ -15,7 +15,6 @@ from typing import Optional -from pyignite.connection.bitmask_feature import feature_flags_as_bytes from pyignite.connection.protocol_context import ProtocolContext from pyignite.datatypes import Byte, Int, Short, String, UUIDObject, ByteArrayObject from pyignite.datatypes.internal import Struct @@ -74,7 +73,7 @@ def __create_handshake_data(self): 'client_code': 2, # fixed value defined by protocol } if self.protocol_context.is_feature_flags_supported(): - features = feature_flags_as_bytes(self.protocol_context.features) + features = self.protocol_context.features.to_bytearray() handshake_data.update({ 'features': features, }) From 3c26217810012f404371719b4cd779873dc874e5 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 14:50:27 +0300 Subject: [PATCH 10/28] IGNITE-14465: Fix for cluster tests --- tests/custom/test_cluster.py | 13 ++++++++++--- tests/util.py | 2 +- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py index 15e036d..cb0c462 100644 --- a/tests/custom/test_cluster.py +++ b/tests/custom/test_cluster.py @@ -16,7 +16,7 @@ import pytest from pyignite.exceptions import CacheError -from tests.util import kill_process_tree +from tests.util import kill_process_tree, clear_ignite_work_dir from pyignite.datatypes.cluster_state import ClusterState @@ -26,7 +26,14 @@ def with_persistence(request): yield request.param == 'with-persistence' -def test_cluster_set_active(start_ignite_server, start_client, with_persistence): +@pytest.fixture(scope='module', autouse=True) +def cleanup(): + clear_ignite_work_dir() + yield None + clear_ignite_work_dir() + + +def test_cluster_set_active(start_ignite_server, start_client, with_persistence, cleanup): key = 42 val = 42 server1 = start_ignite_server(idx=1, use_persistence=with_persistence) @@ -68,7 +75,7 @@ def test_cluster_set_active(start_ignite_server, start_client, with_persistence) @pytest.mark.asyncio -async def test_cluster_set_active_async(start_ignite_server, start_async_client, with_persistence): +async def test_cluster_set_active_async(start_ignite_server, start_async_client, with_persistence, cleanup): key = 42 val = 42 server1 = start_ignite_server(idx=1, use_persistence=with_persistence) diff --git a/tests/util.py b/tests/util.py index 87283fb..5e5dcfc 100644 --- a/tests/util.py +++ b/tests/util.py @@ -185,7 +185,7 @@ def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False, use_persiste srv = subprocess.Popen(ignite_cmd, env=env, cwd=get_test_dir()) - started = wait_for_condition(lambda: check_server_started(idx), timeout=30) + started = wait_for_condition(lambda: check_server_started(idx), timeout=60) if started: return srv From c37542686dedf4974006ee83308e0748eeb5019d Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 14:55:09 +0300 Subject: [PATCH 11/28] IGNITE-14465: Fix tests --- tests/custom/test_cluster.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py index cb0c462..40f626b 100644 --- a/tests/custom/test_cluster.py +++ b/tests/custom/test_cluster.py @@ -36,12 +36,18 @@ def cleanup(): def test_cluster_set_active(start_ignite_server, start_client, with_persistence, cleanup): key = 42 val = 42 + server1 = start_ignite_server(idx=1, use_persistence=with_persistence) server2 = start_ignite_server(idx=2, use_persistence=with_persistence) + + start_state = ClusterState.INACTIVE if with_persistence else ClusterState.ACTIVE try: client = start_client() with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): cluster = client.get_cluster() + assert cluster.get_state() == start_state + + cluster.set_state(ClusterState.ACTIVE) assert cluster.get_state() == ClusterState.ACTIVE cache = client.get_or_create_cache("test_cache") @@ -78,12 +84,18 @@ def test_cluster_set_active(start_ignite_server, start_client, with_persistence, async def test_cluster_set_active_async(start_ignite_server, start_async_client, with_persistence, cleanup): key = 42 val = 42 + server1 = start_ignite_server(idx=1, use_persistence=with_persistence) server2 = start_ignite_server(idx=2, use_persistence=with_persistence) + + start_state = ClusterState.INACTIVE if with_persistence else ClusterState.ACTIVE try: client = start_async_client() async with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): cluster = client.get_cluster() + assert await cluster.get_state() == start_state + + await cluster.set_state(ClusterState.ACTIVE) assert await cluster.get_state() == ClusterState.ACTIVE cache = await client.get_or_create_cache("test_cache") From 1404bdb838c4b5bc7e5e6186825ab771edecb05f Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 05:38:30 -0700 Subject: [PATCH 12/28] IGNITE-14465: Changed default timeout to None --- pyignite/connection/connection.py | 2 +- tests/custom/test_cluster.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py index f766059..9a93f87 100644 --- a/pyignite/connection/connection.py +++ b/pyignite/connection/connection.py @@ -104,7 +104,7 @@ class Connection(BaseConnection): * binary protocol connector. Encapsulates handshake and failover reconnection. """ - def __init__(self, client: 'Client', host: str, port: int, timeout: float = 2.0, + def __init__(self, client: 'Client', host: str, port: int, timeout: float = None, username: str = None, password: str = None, **ssl_params): """ Initialize connection. diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py index 40f626b..8b7d770 100644 --- a/tests/custom/test_cluster.py +++ b/tests/custom/test_cluster.py @@ -26,7 +26,7 @@ def with_persistence(request): yield request.param == 'with-persistence' -@pytest.fixture(scope='module', autouse=True) +@pytest.fixture(autouse=True) def cleanup(): clear_ignite_work_dir() yield None From 942b61c9a2827beffe441e461e6c61c100ee0cb9 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 05:53:02 -0700 Subject: [PATCH 13/28] IGNITE-14465: Fix feature bitmask retrieval --- pyignite/connection/aio_connection.py | 2 +- pyignite/connection/bitmask_feature.py | 13 +++++++++++++ pyignite/connection/connection.py | 1 + pyignite/connection/protocol_context.py | 4 +++- 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py index abd5185..3ee3fac 100644 --- a/pyignite/connection/aio_connection.py +++ b/pyignite/connection/aio_connection.py @@ -133,8 +133,8 @@ async def _connect(self) -> Union[dict, OrderedDict]: raise # connection is ready for end user + self.client.protocol_context.features = BitmaskFeature.from_array(result.get('features', None)) self.uuid = result.get('node_uuid', None) # version-specific (1.4+) - self.failed = False return result diff --git a/pyignite/connection/bitmask_feature.py b/pyignite/connection/bitmask_feature.py index bec02a8..0b7768d 100644 --- a/pyignite/connection/bitmask_feature.py +++ b/pyignite/connection/bitmask_feature.py @@ -15,6 +15,7 @@ from enum import IntFlag +from typing import Optional from pyignite.constants import PROTOCOL_BYTE_ORDER @@ -42,3 +43,15 @@ def all_supported() -> 'BitmaskFeature': for feature in BitmaskFeature: supported |= feature return supported + + @staticmethod + def from_array(features_array: bytearray) -> Optional['BitmaskFeature']: + """ + Get features from bytearray. + + :param features_array: Feature bitmask as array, + :return: Return features. + """ + if features_array is None: + return None + return BitmaskFeature.from_bytes(features_array, byteorder=PROTOCOL_BYTE_ORDER) diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py index 9a93f87..52dd716 100644 --- a/pyignite/connection/connection.py +++ b/pyignite/connection/connection.py @@ -185,6 +185,7 @@ def connect(self) -> Union[dict, OrderedDict]: raise # connection is ready for end user + self.client.protocol_context.features = BitmaskFeature.from_array(result.get('features', None)) self.uuid = result.get('node_uuid', None) # version-specific (1.4+) self.failed = False return result diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py index 78713f7..088e075 100644 --- a/pyignite/connection/protocol_context.py +++ b/pyignite/connection/protocol_context.py @@ -50,4 +50,6 @@ def if_cluster_api_supported(self) -> bool: """ Check whether cluster API supported by the current protocol. """ - return BitmaskFeature.CLUSTER_API in self.features + return self.is_feature_flags_supported() and \ + self.features and \ + BitmaskFeature.CLUSTER_API in self.features From 17ccb312ffc532ba8f3dafae72d5e807293b9b96 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 06:16:30 -0700 Subject: [PATCH 14/28] IGNITE-14465: Made protocol context consistent --- pyignite/connection/aio_connection.py | 5 +-- pyignite/connection/connection.py | 5 +-- pyignite/connection/protocol_context.py | 47 +++++++++++++++++++++---- 3 files changed, 47 insertions(+), 10 deletions(-) diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py index 3ee3fac..854cb6a 100644 --- a/pyignite/connection/aio_connection.py +++ b/pyignite/connection/aio_connection.py @@ -122,7 +122,7 @@ async def _connect(self) -> Union[dict, OrderedDict]: result = await self._connect_version() except HandshakeError as e: if e.expected_version in PROTOCOLS: - self.client.protocol_context.version = e.expected_version + self.client.protocol_context.set_version(e.expected_version) result = await self._connect_version() else: raise e @@ -133,7 +133,8 @@ async def _connect(self) -> Union[dict, OrderedDict]: raise # connection is ready for end user - self.client.protocol_context.features = BitmaskFeature.from_array(result.get('features', None)) + features = BitmaskFeature.from_array(result.get('features', None)) + self.client.protocol_context.try_set_features(features) self.uuid = result.get('node_uuid', None) # version-specific (1.4+) self.failed = False return result diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py index 52dd716..e6b8938 100644 --- a/pyignite/connection/connection.py +++ b/pyignite/connection/connection.py @@ -174,7 +174,7 @@ def connect(self) -> Union[dict, OrderedDict]: result = self._connect_version() except HandshakeError as e: if e.expected_version in PROTOCOLS: - self.client.protocol_context.version = e.expected_version + self.client.protocol_context.set_version(e.expected_version) result = self._connect_version() else: raise e @@ -185,7 +185,8 @@ def connect(self) -> Union[dict, OrderedDict]: raise # connection is ready for end user - self.client.protocol_context.features = BitmaskFeature.from_array(result.get('features', None)) + features = BitmaskFeature.from_array(result.get('features', None)) + self.client.protocol_context.try_set_features(features) self.uuid = result.get('node_uuid', None) # version-specific (1.4+) self.failed = False return result diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py index 088e075..5fc3136 100644 --- a/pyignite/connection/protocol_context.py +++ b/pyignite/connection/protocol_context.py @@ -24,9 +24,46 @@ class ProtocolContext: protocol features. """ - def __init__(self, version: Tuple[int, int, int], features: [int] = None): - self.version = version - self.features = features + def __init__(self, version: Tuple[int, int, int], features: BitmaskFeature = None): + self._version = version + self._features = features + self._ensure_consistency() + + def _ensure_consistency(self): + if not self.is_feature_flags_supported(): + self._features = None + + @property + def version(self): + return getattr(self, '_version', None) + + def set_version(self, version: Tuple[int, int, int]): + """ + Set version. + + This call may result in features being reset to None if the protocol + version does not support feature masks. + + :param version: Version to set. + """ + self._version = version + self._ensure_consistency() + + @property + def features(self): + return getattr(self, '_features', None) + + def try_set_features(self, features: BitmaskFeature): + """ + Try and set new feature set. + + If features are not supported by the protocol, None is set as features + instead. + + :param features: Features to set. + """ + self._features = features + self._ensure_consistency() def is_partition_awareness_supported(self) -> bool: """ @@ -50,6 +87,4 @@ def if_cluster_api_supported(self) -> bool: """ Check whether cluster API supported by the current protocol. """ - return self.is_feature_flags_supported() and \ - self.features and \ - BitmaskFeature.CLUSTER_API in self.features + return self.features and BitmaskFeature.CLUSTER_API in self.features From 1db85808f217c52ae3286a327a0febbca172e0ce Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 06:26:55 -0700 Subject: [PATCH 15/28] IGNITE-14465: Make ProtocolContext hashable --- pyignite/connection/protocol_context.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py index 5fc3136..9fe252a 100644 --- a/pyignite/connection/protocol_context.py +++ b/pyignite/connection/protocol_context.py @@ -29,6 +29,15 @@ def __init__(self, version: Tuple[int, int, int], features: BitmaskFeature = Non self._features = features self._ensure_consistency() + def __hash__(self): + return hash((self._version, self._features)) + + def __eq__(self, other): + if isinstance(other, ProtocolContext): + return self.version == other.version and \ + self.features == other.features + return NotImplemented + def _ensure_consistency(self): if not self.is_feature_flags_supported(): self._features = None From 5e64f242dd647f37ebe87e77c59aaac0ba1cedc3 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 16:51:16 +0300 Subject: [PATCH 16/28] IGNITE-14465: Review fixes --- pyignite/api/cluster.py | 12 ++++++------ pyignite/exceptions.py | 8 ++++++++ pyignite/queries/response.py | 4 ++-- tests/custom/test_cluster.py | 2 +- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/pyignite/api/cluster.py b/pyignite/api/cluster.py index 051432d..c59be18 100644 --- a/pyignite/api/cluster.py +++ b/pyignite/api/cluster.py @@ -12,12 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import warnings - from pyignite.api import APIResult from pyignite.connection import AioConnection, Connection from pyignite.datatypes import Byte -from pyignite.datatypes.cluster_state import ClusterState +from pyignite.exceptions import NotSupportedByClusterError from pyignite.queries import Query, query_perform from pyignite.queries.op_codes import OP_CLUSTER_GET_STATE, OP_CLUSTER_CHANGE_STATE @@ -50,6 +48,9 @@ def __post_process_get_state(result): def __cluster_get_state(connection, query_id): + if not connection.protocol_context.if_cluster_api_supported(): + raise NotSupportedByClusterError('Cluster API is not supported by the cluster') + query_struct = Query(OP_CLUSTER_GET_STATE, query_id=query_id) return query_perform( query_struct, connection, @@ -87,9 +88,8 @@ def __post_process_set_state(result): def __cluster_set_state(connection, state, query_id): - if state == ClusterState.ACTIVE_READ_ONLY and \ - not connection.protocol_context.if_cluster_api_supported(): - warnings.warn(f'ClusterState.ACTIVE_READ_ONLY is not supported by the cluster', category=RuntimeWarning) + if not connection.protocol_context.if_cluster_api_supported(): + raise NotSupportedByClusterError('Cluster API is not supported by the cluster') query_struct = Query( OP_CLUSTER_CHANGE_STATE, diff --git a/pyignite/exceptions.py b/pyignite/exceptions.py index 0919fd4..215ccd0 100644 --- a/pyignite/exceptions.py +++ b/pyignite/exceptions.py @@ -101,4 +101,12 @@ class ClusterError(Exception): pass +class NotSupportedByClusterError(Exception): + """ + This exception is raised, whenever cluster is not supported specific + operation probably because it is outdated. + """ + pass + + connection_errors = (IOError, OSError, EOFError) diff --git a/pyignite/queries/response.py b/pyignite/queries/response.py index 2efa517..6495802 100644 --- a/pyignite/queries/response.py +++ b/pyignite/queries/response.py @@ -45,7 +45,7 @@ def __build_header(self): ('query_id', ctypes.c_longlong), ] - if self.protocol_context and self.protocol_context.is_status_flags_supported(): + if self.protocol_context.is_status_flags_supported(): fields.append(('flags', ctypes.c_short)) else: fields.append(('status_code', ctypes.c_int),) @@ -69,7 +69,7 @@ def __parse_header(self, stream): fields = [] has_error = False - if self.protocol_context and self.protocol_context.is_status_flags_supported(): + if self.protocol_context.is_status_flags_supported(): if header.flags & RHF_TOPOLOGY_CHANGED: fields = [ ('affinity_version', ctypes.c_longlong), diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py index 8b7d770..14e4124 100644 --- a/tests/custom/test_cluster.py +++ b/tests/custom/test_cluster.py @@ -42,7 +42,7 @@ def test_cluster_set_active(start_ignite_server, start_client, with_persistence, start_state = ClusterState.INACTIVE if with_persistence else ClusterState.ACTIVE try: - client = start_client() + client = start_client(timeout=0) with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): cluster = client.get_cluster() assert cluster.get_state() == start_state From 5426dafc57a315b28c9ef9048cbefd8d361dd3af Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 17:19:26 +0300 Subject: [PATCH 17/28] IGNITE-14465: Debug removed --- tests/custom/test_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py index 14e4124..8b7d770 100644 --- a/tests/custom/test_cluster.py +++ b/tests/custom/test_cluster.py @@ -42,7 +42,7 @@ def test_cluster_set_active(start_ignite_server, start_client, with_persistence, start_state = ClusterState.INACTIVE if with_persistence else ClusterState.ACTIVE try: - client = start_client(timeout=0) + client = start_client() with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): cluster = client.get_cluster() assert cluster.get_state() == start_state From 525346b40dd039db55c2e6fc89c8c8627cc46c2f Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 09:11:17 -0700 Subject: [PATCH 18/28] IGNITE-14465: Style fix --- pyignite/client.py | 2 +- pyignite/connection/protocol_context.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyignite/client.py b/pyignite/client.py index 107c96c..b7c4046 100644 --- a/pyignite/client.py +++ b/pyignite/client.py @@ -109,7 +109,7 @@ def partition_aware(self): @property def partition_awareness_supported_by_protocol(self): return self.protocol_context is not None \ - and self.protocol_context.is_partition_awareness_supported() + and self.protocol_context.is_partition_awareness_supported() @property def compact_footer(self) -> bool: diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py index 9fe252a..7a7bb59 100644 --- a/pyignite/connection/protocol_context.py +++ b/pyignite/connection/protocol_context.py @@ -35,7 +35,7 @@ def __hash__(self): def __eq__(self, other): if isinstance(other, ProtocolContext): return self.version == other.version and \ - self.features == other.features + self.features == other.features return NotImplemented def _ensure_consistency(self): From 3a9eaa30b0c69aec3e573b3da5435d42e9e68657 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 09:51:39 -0700 Subject: [PATCH 19/28] IGNITE-14465: Re-factor tests --- tests/custom/test_cluster.py | 131 +++++++++++++++++------------------ tests/util.py | 4 +- 2 files changed, 66 insertions(+), 69 deletions(-) diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py index 8b7d770..68eaf62 100644 --- a/tests/custom/test_cluster.py +++ b/tests/custom/test_cluster.py @@ -15,8 +15,9 @@ import pytest +from pyignite import Client, AioClient from pyignite.exceptions import CacheError -from tests.util import kill_process_tree, clear_ignite_work_dir +from tests.util import kill_process_tree, clear_ignite_work_dir, start_ignite_gen from pyignite.datatypes.cluster_state import ClusterState @@ -33,96 +34,92 @@ def cleanup(): clear_ignite_work_dir() -def test_cluster_set_active(start_ignite_server, start_client, with_persistence, cleanup): - key = 42 - val = 42 +@pytest.fixture(autouse=True) +def server1(with_persistence): + yield from start_ignite_gen(idx=1, use_persistence=with_persistence) + + +@pytest.fixture(autouse=True) +def server2(with_persistence): + yield from start_ignite_gen(idx=2, use_persistence=with_persistence) - server1 = start_ignite_server(idx=1, use_persistence=with_persistence) - server2 = start_ignite_server(idx=2, use_persistence=with_persistence) +def test_cluster_set_active(start_client, with_persistence): + key = 42 + val = 42 start_state = ClusterState.INACTIVE if with_persistence else ClusterState.ACTIVE - try: - client = start_client() - with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): - cluster = client.get_cluster() - assert cluster.get_state() == start_state - cluster.set_state(ClusterState.ACTIVE) - assert cluster.get_state() == ClusterState.ACTIVE + client = start_client() + with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): + cluster = client.get_cluster() + assert cluster.get_state() == start_state + + cluster.set_state(ClusterState.ACTIVE) + assert cluster.get_state() == ClusterState.ACTIVE - cache = client.get_or_create_cache("test_cache") - cache.put(key, val) - assert cache.get(key) == val + cache = client.get_or_create_cache("test_cache") + cache.put(key, val) + assert cache.get(key) == val - cluster.set_state(ClusterState.ACTIVE_READ_ONLY) - assert cluster.get_state() == ClusterState.ACTIVE_READ_ONLY + cluster.set_state(ClusterState.ACTIVE_READ_ONLY) + assert cluster.get_state() == ClusterState.ACTIVE_READ_ONLY - assert cache.get(key) == val - with pytest.raises(CacheError): - cache.put(key, val + 1) + assert cache.get(key) == val + with pytest.raises(CacheError): + cache.put(key, val + 1) - cluster.set_state(ClusterState.INACTIVE) - assert cluster.get_state() == ClusterState.INACTIVE + cluster.set_state(ClusterState.INACTIVE) + assert cluster.get_state() == ClusterState.INACTIVE - with pytest.raises(CacheError): - cache.get(key) + with pytest.raises(CacheError): + cache.get(key) - with pytest.raises(CacheError): - cache.put(key, val + 1) + with pytest.raises(CacheError): + cache.put(key, val + 1) - cluster.set_state(ClusterState.ACTIVE) - assert cluster.get_state() == ClusterState.ACTIVE + cluster.set_state(ClusterState.ACTIVE) + assert cluster.get_state() == ClusterState.ACTIVE - cache.put(key, val + 2) - assert cache.get(key) == val + 2 - finally: - kill_process_tree(server1.pid) - kill_process_tree(server2.pid) + cache.put(key, val + 2) + assert cache.get(key) == val + 2 @pytest.mark.asyncio -async def test_cluster_set_active_async(start_ignite_server, start_async_client, with_persistence, cleanup): +async def test_cluster_set_active_async(start_async_client, with_persistence): key = 42 val = 42 - - server1 = start_ignite_server(idx=1, use_persistence=with_persistence) - server2 = start_ignite_server(idx=2, use_persistence=with_persistence) - start_state = ClusterState.INACTIVE if with_persistence else ClusterState.ACTIVE - try: - client = start_async_client() - async with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): - cluster = client.get_cluster() - assert await cluster.get_state() == start_state - await cluster.set_state(ClusterState.ACTIVE) - assert await cluster.get_state() == ClusterState.ACTIVE + client = start_async_client() + async with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): + cluster = client.get_cluster() + assert await cluster.get_state() == start_state + + await cluster.set_state(ClusterState.ACTIVE) + assert await cluster.get_state() == ClusterState.ACTIVE - cache = await client.get_or_create_cache("test_cache") - await cache.put(key, val) - assert await cache.get(key) == val + cache = await client.get_or_create_cache("test_cache") + await cache.put(key, val) + assert await cache.get(key) == val - await cluster.set_state(ClusterState.ACTIVE_READ_ONLY) - assert await cluster.get_state() == ClusterState.ACTIVE_READ_ONLY + await cluster.set_state(ClusterState.ACTIVE_READ_ONLY) + assert await cluster.get_state() == ClusterState.ACTIVE_READ_ONLY - assert await cache.get(key) == val - with pytest.raises(CacheError): - await cache.put(key, val + 1) + assert await cache.get(key) == val + with pytest.raises(CacheError): + await cache.put(key, val + 1) - await cluster.set_state(ClusterState.INACTIVE) - assert await cluster.get_state() == ClusterState.INACTIVE + await cluster.set_state(ClusterState.INACTIVE) + assert await cluster.get_state() == ClusterState.INACTIVE - with pytest.raises(CacheError): - await cache.get(key) + with pytest.raises(CacheError): + await cache.get(key) - with pytest.raises(CacheError): - await cache.put(key, val + 1) + with pytest.raises(CacheError): + await cache.put(key, val + 1) - await cluster.set_state(ClusterState.ACTIVE) - assert await cluster.get_state() == ClusterState.ACTIVE + await cluster.set_state(ClusterState.ACTIVE) + assert await cluster.get_state() == ClusterState.ACTIVE - await cache.put(key, val + 2) - assert await cache.get(key) == val + 2 - finally: - kill_process_tree(server1.pid) - kill_process_tree(server2.pid) + await cache.put(key, val + 2) + assert await cache.get(key) == val + 2 diff --git a/tests/util.py b/tests/util.py index 5e5dcfc..af3b70e 100644 --- a/tests/util.py +++ b/tests/util.py @@ -193,8 +193,8 @@ def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False, use_persiste raise Exception("Failed to start Ignite: timeout while trying to connect") -def start_ignite_gen(idx=1, use_ssl=False, use_auth=False): - srv = start_ignite(idx, use_ssl=use_ssl, use_auth=use_auth) +def start_ignite_gen(idx=1, use_ssl=False, use_auth=False, use_persistence=False): + srv = start_ignite(idx, use_ssl=use_ssl, use_auth=use_auth, use_persistence=use_persistence) try: yield srv finally: From 3ce050762ea1bd3453936b0901a2dad7f967589a Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 11:52:22 -0700 Subject: [PATCH 20/28] IGNITE-14465: Fix --- tests/custom/test_cluster.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py index 68eaf62..209522e 100644 --- a/tests/custom/test_cluster.py +++ b/tests/custom/test_cluster.py @@ -35,12 +35,12 @@ def cleanup(): @pytest.fixture(autouse=True) -def server1(with_persistence): +def server1(with_persistence, cleanup): yield from start_ignite_gen(idx=1, use_persistence=with_persistence) @pytest.fixture(autouse=True) -def server2(with_persistence): +def server2(with_persistence, cleanup): yield from start_ignite_gen(idx=2, use_persistence=with_persistence) From 33a4b2e63580653d3aab83af6bb69a0244d53660 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 11:53:43 -0700 Subject: [PATCH 21/28] IGNITE-14465: Remove unused imports --- tests/custom/test_cluster.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py index 209522e..782878f 100644 --- a/tests/custom/test_cluster.py +++ b/tests/custom/test_cluster.py @@ -15,9 +15,8 @@ import pytest -from pyignite import Client, AioClient from pyignite.exceptions import CacheError -from tests.util import kill_process_tree, clear_ignite_work_dir, start_ignite_gen +from tests.util import clear_ignite_work_dir, start_ignite_gen from pyignite.datatypes.cluster_state import ClusterState From 80efb03a40bc0dc8100815f1c0b067250a736bed Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 11:59:21 -0700 Subject: [PATCH 22/28] IGNITE-14465: Fix tests --- tests/custom/conftest.py | 49 ------------------------------------ tests/custom/test_cluster.py | 9 ++++--- 2 files changed, 5 insertions(+), 53 deletions(-) delete mode 100644 tests/custom/conftest.py diff --git a/tests/custom/conftest.py b/tests/custom/conftest.py deleted file mode 100644 index 1d85d4a..0000000 --- a/tests/custom/conftest.py +++ /dev/null @@ -1,49 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest - -from pyignite import Client, AioClient -from tests.util import start_ignite - - -@pytest.fixture(scope='module') -def start_ignite_server(): - def start(idx=1, debug=False, use_ssl=False, use_auth=False, use_persistence=False): - return start_ignite( - idx=idx, - debug=debug, - use_ssl=use_ssl, - use_auth=use_auth, - use_persistence=use_persistence, - ) - - return start - - -@pytest.fixture(scope='module') -def start_client(): - def start(**kwargs): - return Client(**kwargs) - - return start - - -@pytest.fixture(scope='module') -def start_async_client(): - def start(**kwargs): - return AioClient(**kwargs) - - return start diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py index 782878f..e82e238 100644 --- a/tests/custom/test_cluster.py +++ b/tests/custom/test_cluster.py @@ -15,6 +15,7 @@ import pytest +from pyignite import Client, AioClient from pyignite.exceptions import CacheError from tests.util import clear_ignite_work_dir, start_ignite_gen @@ -43,12 +44,12 @@ def server2(with_persistence, cleanup): yield from start_ignite_gen(idx=2, use_persistence=with_persistence) -def test_cluster_set_active(start_client, with_persistence): +def test_cluster_set_active(with_persistence): key = 42 val = 42 start_state = ClusterState.INACTIVE if with_persistence else ClusterState.ACTIVE - client = start_client() + client = Client() with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): cluster = client.get_cluster() assert cluster.get_state() == start_state @@ -84,12 +85,12 @@ def test_cluster_set_active(start_client, with_persistence): @pytest.mark.asyncio -async def test_cluster_set_active_async(start_async_client, with_persistence): +async def test_cluster_set_active_async(with_persistence): key = 42 val = 42 start_state = ClusterState.INACTIVE if with_persistence else ClusterState.ACTIVE - client = start_async_client() + client = AioClient() async with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): cluster = client.get_cluster() assert await cluster.get_state() == start_state From 8b7530f9b9b2ee7313ba94a64c84e646ee419513 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 12:27:24 -0700 Subject: [PATCH 23/28] IGNITE-14465: Fix ProtocolContext.__eq__ --- pyignite/connection/protocol_context.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py index 7a7bb59..3e6a516 100644 --- a/pyignite/connection/protocol_context.py +++ b/pyignite/connection/protocol_context.py @@ -33,10 +33,9 @@ def __hash__(self): return hash((self._version, self._features)) def __eq__(self, other): - if isinstance(other, ProtocolContext): - return self.version == other.version and \ - self.features == other.features - return NotImplemented + return isinstance(other, ProtocolContext) and \ + self.version == other.version and \ + self.features == other.features def _ensure_consistency(self): if not self.is_feature_flags_supported(): From 351fd4a15a357473d50c1641c9634a26d287c826 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 12:28:36 -0700 Subject: [PATCH 24/28] IGNITE-14465: Fix typo --- pyignite/api/cluster.py | 4 ++-- pyignite/connection/protocol_context.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pyignite/api/cluster.py b/pyignite/api/cluster.py index c59be18..e134239 100644 --- a/pyignite/api/cluster.py +++ b/pyignite/api/cluster.py @@ -48,7 +48,7 @@ def __post_process_get_state(result): def __cluster_get_state(connection, query_id): - if not connection.protocol_context.if_cluster_api_supported(): + if not connection.protocol_context.is_cluster_api_supported(): raise NotSupportedByClusterError('Cluster API is not supported by the cluster') query_struct = Query(OP_CLUSTER_GET_STATE, query_id=query_id) @@ -88,7 +88,7 @@ def __post_process_set_state(result): def __cluster_set_state(connection, state, query_id): - if not connection.protocol_context.if_cluster_api_supported(): + if not connection.protocol_context.is_cluster_api_supported(): raise NotSupportedByClusterError('Cluster API is not supported by the cluster') query_struct = Query( diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py index 3e6a516..42a4078 100644 --- a/pyignite/connection/protocol_context.py +++ b/pyignite/connection/protocol_context.py @@ -91,7 +91,7 @@ def is_feature_flags_supported(self) -> bool: """ return self.version >= (1, 7, 0) - def if_cluster_api_supported(self) -> bool: + def is_cluster_api_supported(self) -> bool: """ Check whether cluster API supported by the current protocol. """ From dc3c4bf9d86c21b90540db89f11e845a2ea4b87b Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 19:59:55 +0300 Subject: [PATCH 25/28] IGNITE-14465: Re-factoring --- pyignite/connection/aio_connection.py | 4 ++-- pyignite/connection/connection.py | 4 ++-- pyignite/connection/protocol_context.py | 10 ++++++---- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py index 854cb6a..ce32592 100644 --- a/pyignite/connection/aio_connection.py +++ b/pyignite/connection/aio_connection.py @@ -122,7 +122,7 @@ async def _connect(self) -> Union[dict, OrderedDict]: result = await self._connect_version() except HandshakeError as e: if e.expected_version in PROTOCOLS: - self.client.protocol_context.set_version(e.expected_version) + self.client.protocol_context.version = e.expected_version result = await self._connect_version() else: raise e @@ -134,7 +134,7 @@ async def _connect(self) -> Union[dict, OrderedDict]: # connection is ready for end user features = BitmaskFeature.from_array(result.get('features', None)) - self.client.protocol_context.try_set_features(features) + self.client.protocol_context.features = features self.uuid = result.get('node_uuid', None) # version-specific (1.4+) self.failed = False return result diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py index e6b8938..7d5778c 100644 --- a/pyignite/connection/connection.py +++ b/pyignite/connection/connection.py @@ -174,7 +174,7 @@ def connect(self) -> Union[dict, OrderedDict]: result = self._connect_version() except HandshakeError as e: if e.expected_version in PROTOCOLS: - self.client.protocol_context.set_version(e.expected_version) + self.client.protocol_context.version = e.expected_version result = self._connect_version() else: raise e @@ -186,7 +186,7 @@ def connect(self) -> Union[dict, OrderedDict]: # connection is ready for end user features = BitmaskFeature.from_array(result.get('features', None)) - self.client.protocol_context.try_set_features(features) + self.client.protocol_context.features = features self.uuid = result.get('node_uuid', None) # version-specific (1.4+) self.failed = False return result diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py index 42a4078..54f5240 100644 --- a/pyignite/connection/protocol_context.py +++ b/pyignite/connection/protocol_context.py @@ -45,7 +45,8 @@ def _ensure_consistency(self): def version(self): return getattr(self, '_version', None) - def set_version(self, version: Tuple[int, int, int]): + @version.setter + def version(self, version: Tuple[int, int, int]): """ Set version. @@ -54,14 +55,15 @@ def set_version(self, version: Tuple[int, int, int]): :param version: Version to set. """ - self._version = version + setattr(self, '_version', version) self._ensure_consistency() @property def features(self): return getattr(self, '_features', None) - def try_set_features(self, features: BitmaskFeature): + @features.setter + def features(self, features: BitmaskFeature): """ Try and set new feature set. @@ -70,7 +72,7 @@ def try_set_features(self, features: BitmaskFeature): :param features: Features to set. """ - self._features = features + setattr(self, '_features', features) self._ensure_consistency() def is_partition_awareness_supported(self) -> bool: From bd5211c473c1fe474bacc6d1a86ba99db6b37c66 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 22:33:21 +0300 Subject: [PATCH 26/28] IGNITE-14465: Bitmask to bytes --- pyignite/connection/bitmask_feature.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyignite/connection/bitmask_feature.py b/pyignite/connection/bitmask_feature.py index 0b7768d..e6491f9 100644 --- a/pyignite/connection/bitmask_feature.py +++ b/pyignite/connection/bitmask_feature.py @@ -23,14 +23,14 @@ class BitmaskFeature(IntFlag): CLUSTER_API = 1 << 2 - def to_bytearray(self) -> bytearray: + def to_bytearray(self) -> bytes: """ Convert feature flags array to bytearray bitmask. :return: Bitmask as bytearray. """ full_bytes = self.bit_length() // 8 + 1 - return bytearray(self.to_bytes(full_bytes, byteorder=PROTOCOL_BYTE_ORDER)) + return self.to_bytes(full_bytes, byteorder=PROTOCOL_BYTE_ORDER) @staticmethod def all_supported() -> 'BitmaskFeature': @@ -45,7 +45,7 @@ def all_supported() -> 'BitmaskFeature': return supported @staticmethod - def from_array(features_array: bytearray) -> Optional['BitmaskFeature']: + def from_array(features_array: bytes) -> Optional['BitmaskFeature']: """ Get features from bytearray. From 63a54e4d1360ed619045b54c1a77e9813823bdef Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 22:36:35 +0300 Subject: [PATCH 27/28] IGNITE-14465: Fix --- pyignite/connection/bitmask_feature.py | 2 +- pyignite/connection/handshake.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyignite/connection/bitmask_feature.py b/pyignite/connection/bitmask_feature.py index e6491f9..80d51ad 100644 --- a/pyignite/connection/bitmask_feature.py +++ b/pyignite/connection/bitmask_feature.py @@ -23,7 +23,7 @@ class BitmaskFeature(IntFlag): CLUSTER_API = 1 << 2 - def to_bytearray(self) -> bytes: + def __bytes__(self) -> bytes: """ Convert feature flags array to bytearray bitmask. diff --git a/pyignite/connection/handshake.py b/pyignite/connection/handshake.py index ad5aaee..eddd72d 100644 --- a/pyignite/connection/handshake.py +++ b/pyignite/connection/handshake.py @@ -73,7 +73,7 @@ def __create_handshake_data(self): 'client_code': 2, # fixed value defined by protocol } if self.protocol_context.is_feature_flags_supported(): - features = self.protocol_context.features.to_bytearray() + features = bytes(self.protocol_context.features) handshake_data.update({ 'features': features, }) From 52c4a6b4fc5c300ddcb9048dd905ef51a2017485 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 2 Apr 2021 23:09:55 +0300 Subject: [PATCH 28/28] IGNITE-14465: Fix --- pyignite/connection/handshake.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/pyignite/connection/handshake.py b/pyignite/connection/handshake.py index eddd72d..af7bdb3 100644 --- a/pyignite/connection/handshake.py +++ b/pyignite/connection/handshake.py @@ -44,9 +44,7 @@ def __init__( ] self.protocol_context = protocol_context if self.protocol_context.is_feature_flags_supported(): - fields.extend([ - ('features', ByteArrayObject), - ]) + fields.append(('features', ByteArrayObject)) if username and password: self.username = username self.password = password @@ -74,13 +72,8 @@ def __create_handshake_data(self): } if self.protocol_context.is_feature_flags_supported(): features = bytes(self.protocol_context.features) - handshake_data.update({ - 'features': features, - }) - handshake_data['length'] += sum([ - 5, - len(features) - ]) + handshake_data['features'] = features + handshake_data['length'] += 5 + len(features) if self.username and self.password: handshake_data.update({ 'username': self.username,