diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py index 5e64450..1870878 100644 --- a/pyignite/aio_client.py +++ b/pyignite/aio_client.py @@ -17,6 +17,7 @@ from itertools import chain from typing import Iterable, Type, Union, Any, Dict +from .aio_cluster import AioCluster from .api import cache_get_node_partitions_async from .api.binary import get_binary_type_async, put_binary_type_async from .api.cache_config import cache_get_names_async @@ -92,7 +93,7 @@ async def _connect(self, nodes): if not self.partition_aware: try: - if self.protocol_version is None: + if self.protocol_context is None: # open connection before adding to the pool await conn.connect() @@ -120,7 +121,7 @@ async def _connect(self, nodes): await asyncio.gather(*reconnect_coro, return_exceptions=True) - if self.protocol_version is None: + if self.protocol_context is None: raise ReconnectError('Can not connect.') async def close(self): @@ -460,3 +461,11 @@ def sql( return AioSqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type, distributed_joins, local, replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows, timeout) + + def get_cluster(self) -> 'AioCluster': + """ + Gets client cluster facade. + + :return: AioClient cluster facade. + """ + return AioCluster(self) diff --git a/pyignite/aio_cluster.py b/pyignite/aio_cluster.py new file mode 100644 index 0000000..6d76125 --- /dev/null +++ b/pyignite/aio_cluster.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module contains `AioCluster` that lets you get info and change state of the +whole cluster asynchronously. +""" +from pyignite.api.cluster import cluster_get_state_async, cluster_set_state_async +from pyignite.exceptions import ClusterError +from pyignite.utils import status_to_exception + + +class AioCluster: + """ + Ignite cluster abstraction. Users should never use this class directly, + but construct its instances with + :py:meth:`~pyignite.aio_client.AioClient.get_cluster` method instead. + """ + + def __init__(self, client: 'AioClient'): + self._client = client + + @status_to_exception(ClusterError) + async def get_state(self): + """ + Gets current cluster state. + + :return: Current cluster state. This is one of ClusterState.INACTIVE, + ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. + """ + return await cluster_get_state_async(await self._client.random_node()) + + @status_to_exception(ClusterError) + async def set_state(self, state): + """ + Changes current cluster state to the given. + + Note: Deactivation clears in-memory caches (without persistence) + including the system caches. + + :param state: New cluster state. This is one of ClusterState.INACTIVE, + ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. + """ + return await cluster_set_state_async(await self._client.random_node(), state) diff --git a/pyignite/api/cluster.py b/pyignite/api/cluster.py new file mode 100644 index 0000000..e134239 --- /dev/null +++ b/pyignite/api/cluster.py @@ -0,0 +1,106 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from pyignite.api import APIResult +from pyignite.connection import AioConnection, Connection +from pyignite.datatypes import Byte +from pyignite.exceptions import NotSupportedByClusterError +from pyignite.queries import Query, query_perform +from pyignite.queries.op_codes import OP_CLUSTER_GET_STATE, OP_CLUSTER_CHANGE_STATE + + +def cluster_get_state(connection: 'Connection', query_id=None) -> 'APIResult': + """ + Get cluster state. + + :param connection: Connection to use, + :param query_id: (optional) a value generated by client and returned as-is + in response.query_id. When the parameter is omitted, a random value + is generated, + :return: API result data object. Contains zero status and a state + retrieved on success, non-zero status and an error description on failure. + """ + return __cluster_get_state(connection, query_id) + + +async def cluster_get_state_async(connection: 'AioConnection', query_id=None) -> 'APIResult': + """ + Async version of cluster_get_state + """ + return await __cluster_get_state(connection, query_id) + + +def __post_process_get_state(result): + if result.status == 0: + result.value = result.value['state'] + return result + + +def __cluster_get_state(connection, query_id): + if not connection.protocol_context.is_cluster_api_supported(): + raise NotSupportedByClusterError('Cluster API is not supported by the cluster') + + query_struct = Query(OP_CLUSTER_GET_STATE, query_id=query_id) + return query_perform( + query_struct, connection, + response_config=[('state', Byte)], + post_process_fun=__post_process_get_state + ) + + +def cluster_set_state(connection: 'Connection', state: int, query_id=None) -> 'APIResult': + """ + Set cluster state. + + :param connection: Connection to use, + :param state: State to set, + :param query_id: (optional) a value generated by client and returned as-is + in response.query_id. When the parameter is omitted, a random value + is generated, + :return: API result data object. Contains zero status if a value + is written, non-zero status and an error description otherwise. + """ + return __cluster_set_state(connection, state, query_id) + + +async def cluster_set_state_async(connection: 'AioConnection', state: int, query_id=None) -> 'APIResult': + """ + Async version of cluster_get_state + """ + return await __cluster_set_state(connection, state, query_id) + + +def __post_process_set_state(result): + if result.status == 0: + result.value = result.value['state'] + return result + + +def __cluster_set_state(connection, state, query_id): + if not connection.protocol_context.is_cluster_api_supported(): + raise NotSupportedByClusterError('Cluster API is not supported by the cluster') + + query_struct = Query( + OP_CLUSTER_CHANGE_STATE, + [ + ('state', Byte) + ], + query_id=query_id + ) + return query_perform( + query_struct, connection, + query_params={ + 'state': state, + } + ) diff --git a/pyignite/client.py b/pyignite/client.py index 2f24c43..b7c4046 100644 --- a/pyignite/client.py +++ b/pyignite/client.py @@ -49,6 +49,7 @@ from .api import cache_get_node_partitions from .api.binary import get_binary_type, put_binary_type from .api.cache_config import cache_get_names +from .cluster import Cluster from .cursors import SqlFieldsCursor from .cache import Cache, create_cache, get_cache, get_or_create_cache, BaseCache from .connection import Connection @@ -83,24 +84,23 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = False, * self._partition_aware = partition_aware self.affinity_version = (0, 0) self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)} - self._protocol_version = None + self._protocol_context = None @property - def protocol_version(self): + def protocol_context(self): """ - Returns the tuple of major, minor, and revision numbers of the used - thin protocol version, or None, if no connection to the Ignite cluster - was not yet established. + Returns protocol context, or None, if no connection to the Ignite + cluster was not yet established. This method is not a part of the public API. Unless you wish to extend the `pyignite` capabilities (with additional testing, logging, examining connections, et c.) you probably should not use it. """ - return self._protocol_version + return self._protocol_context - @protocol_version.setter - def protocol_version(self, value): - self._protocol_version = value + @protocol_context.setter + def protocol_context(self, value): + self._protocol_context = value @property def partition_aware(self): @@ -108,7 +108,8 @@ def partition_aware(self): @property def partition_awareness_supported_by_protocol(self): - return self.protocol_version is not None and self.protocol_version >= (1, 4, 0) + return self.protocol_context is not None \ + and self.protocol_context.is_partition_awareness_supported() @property def compact_footer(self) -> bool: @@ -379,7 +380,7 @@ def _connect(self, nodes): conn = Connection(self, host, port, **self._connection_args) try: - if self.protocol_version is None or self.partition_aware: + if self.protocol_context is None or self.partition_aware: # open connection before adding to the pool conn.connect() @@ -396,7 +397,7 @@ def _connect(self, nodes): self._nodes.append(conn) - if self.protocol_version is None: + if self.protocol_context is None: raise ReconnectError('Can not connect.') def close(self): @@ -727,3 +728,11 @@ def sql( return SqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type, distributed_joins, local, replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows, timeout) + + def get_cluster(self) -> 'Cluster': + """ + Gets client cluster facade. + + :return: Client cluster facade. + """ + return Cluster(self) diff --git a/pyignite/cluster.py b/pyignite/cluster.py new file mode 100644 index 0000000..f10afe4 --- /dev/null +++ b/pyignite/cluster.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module contains `Cluster` that lets you get info and change state of the +whole cluster. +""" +from pyignite.api.cluster import cluster_get_state, cluster_set_state +from pyignite.exceptions import ClusterError +from pyignite.utils import status_to_exception + + +class Cluster: + """ + Ignite cluster abstraction. Users should never use this class directly, + but construct its instances with + :py:meth:`~pyignite.client.Client.get_cluster` method instead. + """ + + def __init__(self, client: 'Client'): + self._client = client + + @status_to_exception(ClusterError) + def get_state(self): + """ + Gets current cluster state. + + :return: Current cluster state. This is one of ClusterState.INACTIVE, + ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. + """ + return cluster_get_state(self._client.random_node) + + @status_to_exception(ClusterError) + def set_state(self, state): + """ + Changes current cluster state to the given. + + Note: Deactivation clears in-memory caches (without persistence) + including the system caches. + + :param state: New cluster state. This is one of ClusterState.INACTIVE, + ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. + """ + return cluster_set_state(self._client.random_node, state) diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py index e5c11da..ce32592 100644 --- a/pyignite/connection/aio_connection.py +++ b/pyignite/connection/aio_connection.py @@ -36,9 +36,11 @@ from pyignite.constants import PROTOCOLS, PROTOCOL_BYTE_ORDER from pyignite.exceptions import HandshakeError, SocketError, connection_errors +from .bitmask_feature import BitmaskFeature from .connection import BaseConnection from .handshake import HandshakeRequest, HandshakeResponse +from .protocol_context import ProtocolContext from .ssl import create_ssl_context from ..stream import AioBinaryStream @@ -112,27 +114,28 @@ async def _connect(self) -> Union[dict, OrderedDict]: detecting_protocol = False # choose highest version first - if self.client.protocol_version is None: + if self.client.protocol_context is None: detecting_protocol = True - self.client.protocol_version = max(PROTOCOLS) + self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported()) try: result = await self._connect_version() except HandshakeError as e: if e.expected_version in PROTOCOLS: - self.client.protocol_version = e.expected_version + self.client.protocol_context.version = e.expected_version result = await self._connect_version() else: raise e except connection_errors: # restore undefined protocol version if detecting_protocol: - self.client.protocol_version = None + self.client.protocol_context = None raise # connection is ready for end user + features = BitmaskFeature.from_array(result.get('features', None)) + self.client.protocol_context.features = features self.uuid = result.get('node_uuid', None) # version-specific (1.4+) - self.failed = False return result @@ -145,10 +148,10 @@ async def _connect_version(self) -> Union[dict, OrderedDict]: ssl_context = create_ssl_context(self.ssl_params) self._reader, self._writer = await asyncio.open_connection(self.host, self.port, ssl=ssl_context) - protocol_version = self.client.protocol_version + protocol_context = self.client.protocol_context hs_request = HandshakeRequest( - protocol_version, + protocol_context, self.username, self.password ) @@ -158,7 +161,7 @@ async def _connect_version(self) -> Union[dict, OrderedDict]: await self._send(stream.getbuffer(), reconnect=False) with AioBinaryStream(self.client, await self._recv(reconnect=False)) as stream: - hs_response = await HandshakeResponse.parse_async(stream, self.protocol_version) + hs_response = await HandshakeResponse.parse_async(stream, self.protocol_context) if hs_response.op_code == 0: self._close() diff --git a/pyignite/connection/bitmask_feature.py b/pyignite/connection/bitmask_feature.py new file mode 100644 index 0000000..80d51ad --- /dev/null +++ b/pyignite/connection/bitmask_feature.py @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from enum import IntFlag +from typing import Optional + +from pyignite.constants import PROTOCOL_BYTE_ORDER + + +class BitmaskFeature(IntFlag): + CLUSTER_API = 1 << 2 + + def __bytes__(self) -> bytes: + """ + Convert feature flags array to bytearray bitmask. + + :return: Bitmask as bytearray. + """ + full_bytes = self.bit_length() // 8 + 1 + return self.to_bytes(full_bytes, byteorder=PROTOCOL_BYTE_ORDER) + + @staticmethod + def all_supported() -> 'BitmaskFeature': + """ + Get all supported features. + + :return: All supported features. + """ + supported = BitmaskFeature(0) + for feature in BitmaskFeature: + supported |= feature + return supported + + @staticmethod + def from_array(features_array: bytes) -> Optional['BitmaskFeature']: + """ + Get features from bytearray. + + :param features_array: Feature bitmask as array, + :return: Return features. + """ + if features_array is None: + return None + return BitmaskFeature.from_bytes(features_array, byteorder=PROTOCOL_BYTE_ORDER) diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py index 901cb56..7d5778c 100644 --- a/pyignite/connection/connection.py +++ b/pyignite/connection/connection.py @@ -13,29 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - from collections import OrderedDict import socket from typing import Union from pyignite.constants import PROTOCOLS, IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER from pyignite.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError +from .bitmask_feature import BitmaskFeature from .handshake import HandshakeRequest, HandshakeResponse +from .protocol_context import ProtocolContext from .ssl import wrap, check_ssl_params from ..stream import BinaryStream @@ -83,19 +70,18 @@ def __repr__(self) -> str: return '{}:{}'.format(self.host or '?', self.port or '?') @property - def protocol_version(self): + def protocol_context(self): """ - Returns the tuple of major, minor, and revision numbers of the used - thin protocol version, or None, if no connection to the Ignite cluster - was yet established. + Returns protocol context, or None, if no connection to the Ignite + cluster was yet established. """ - return self.client.protocol_version + return self.client.protocol_context def _process_handshake_error(self, response): error_text = f'Handshake error: {response.message}' # if handshake fails for any reason other than protocol mismatch # (i.e. authentication error), server version is 0.0.0 - protocol_version = self.client.protocol_version + protocol_version = self.client.protocol_context.version server_version = (response.version_major, response.version_minor, response.version_patch) if any(server_version): @@ -118,7 +104,7 @@ class Connection(BaseConnection): * binary protocol connector. Encapsulates handshake and failover reconnection. """ - def __init__(self, client: 'Client', host: str, port: int, timeout: float = 2.0, + def __init__(self, client: 'Client', host: str, port: int, timeout: float = None, username: str = None, password: str = None, **ssl_params): """ Initialize connection. @@ -180,25 +166,27 @@ def connect(self) -> Union[dict, OrderedDict]: detecting_protocol = False # choose highest version first - if self.client.protocol_version is None: + if self.client.protocol_context is None: detecting_protocol = True - self.client.protocol_version = max(PROTOCOLS) + self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported()) try: result = self._connect_version() except HandshakeError as e: if e.expected_version in PROTOCOLS: - self.client.protocol_version = e.expected_version + self.client.protocol_context.version = e.expected_version result = self._connect_version() else: raise e except connection_errors: # restore undefined protocol version if detecting_protocol: - self.client.protocol_version = None + self.client.protocol_context = None raise # connection is ready for end user + features = BitmaskFeature.from_array(result.get('features', None)) + self.client.protocol_context.features = features self.uuid = result.get('node_uuid', None) # version-specific (1.4+) self.failed = False return result @@ -214,10 +202,10 @@ def _connect_version(self) -> Union[dict, OrderedDict]: self._socket = wrap(self._socket, self.ssl_params) self._socket.connect((self.host, self.port)) - protocol_version = self.client.protocol_version + protocol_context = self.client.protocol_context hs_request = HandshakeRequest( - protocol_version, + protocol_context, self.username, self.password ) @@ -227,7 +215,7 @@ def _connect_version(self) -> Union[dict, OrderedDict]: self.send(stream.getbuffer(), reconnect=False) with BinaryStream(self.client, self.recv(reconnect=False)) as stream: - hs_response = HandshakeResponse.parse(stream, self.protocol_version) + hs_response = HandshakeResponse.parse(stream, self.protocol_context) if hs_response.op_code == 0: self.close() diff --git a/pyignite/connection/handshake.py b/pyignite/connection/handshake.py index 0b0fe50..af7bdb3 100644 --- a/pyignite/connection/handshake.py +++ b/pyignite/connection/handshake.py @@ -13,9 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional, Tuple +from typing import Optional -from pyignite.datatypes import Byte, Int, Short, String, UUIDObject +from pyignite.connection.protocol_context import ProtocolContext +from pyignite.datatypes import Byte, Int, Short, String, UUIDObject, ByteArrayObject from pyignite.datatypes.internal import Struct from pyignite.stream import READ_BACKWARD @@ -27,10 +28,10 @@ class HandshakeRequest: handshake_struct = None username = None password = None - protocol_version = None + protocol_context = None def __init__( - self, protocol_version: Tuple[int, int, int], + self, protocol_context: 'ProtocolContext', username: Optional[str] = None, password: Optional[str] = None ): fields = [ @@ -41,7 +42,9 @@ def __init__( ('version_patch', Short), ('client_code', Byte), ] - self.protocol_version = protocol_version + self.protocol_context = protocol_context + if self.protocol_context.is_feature_flags_supported(): + fields.append(('features', ByteArrayObject)) if username and password: self.username = username self.password = password @@ -58,14 +61,19 @@ async def from_python_async(self, stream): await self.handshake_struct.from_python_async(stream, self.__create_handshake_data()) def __create_handshake_data(self): + version = self.protocol_context.version handshake_data = { 'length': 8, 'op_code': OP_HANDSHAKE, - 'version_major': self.protocol_version[0], - 'version_minor': self.protocol_version[1], - 'version_patch': self.protocol_version[2], + 'version_major': version[0], + 'version_minor': version[1], + 'version_patch': version[2], 'client_code': 2, # fixed value defined by protocol } + if self.protocol_context.is_feature_flags_supported(): + features = bytes(self.protocol_context.features) + handshake_data['features'] = features + handshake_data['length'] += 5 + len(features) if self.username and self.password: handshake_data.update({ 'username': self.username, @@ -96,12 +104,12 @@ def __getattr__(self, item): return self.get(item) @classmethod - def parse(cls, stream, protocol_version): + def parse(cls, stream, protocol_context): start_class = cls.__response_start.parse(stream) start = stream.read_ctype(start_class, direction=READ_BACKWARD) data = cls.__response_start.to_python(start) - response_end = cls.__create_response_end(data, protocol_version) + response_end = cls.__create_response_end(data, protocol_context) if response_end: end_class = response_end.parse(stream) end = stream.read_ctype(end_class, direction=READ_BACKWARD) @@ -110,12 +118,12 @@ def parse(cls, stream, protocol_version): return cls(data) @classmethod - async def parse_async(cls, stream, protocol_version): + async def parse_async(cls, stream, protocol_context): start_class = cls.__response_start.parse(stream) start = stream.read_ctype(start_class, direction=READ_BACKWARD) data = await cls.__response_start.to_python_async(start) - response_end = cls.__create_response_end(data, protocol_version) + response_end = cls.__create_response_end(data, protocol_context) if response_end: end_class = await response_end.parse_async(stream) end = stream.read_ctype(end_class, direction=READ_BACKWARD) @@ -124,7 +132,7 @@ async def parse_async(cls, stream, protocol_version): return cls(data) @classmethod - def __create_response_end(cls, start_data, protocol_version): + def __create_response_end(cls, start_data, protocol_context): response_end = None if start_data['op_code'] == 0: response_end = Struct([ @@ -134,7 +142,12 @@ def __create_response_end(cls, start_data, protocol_version): ('message', String), ('client_status', Int) ]) - elif protocol_version >= (1, 4, 0): + elif protocol_context.is_feature_flags_supported(): + response_end = Struct([ + ('features', ByteArrayObject), + ('node_uuid', UUIDObject), + ]) + elif protocol_context.is_partition_awareness_supported(): response_end = Struct([ ('node_uuid', UUIDObject), ]) diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py new file mode 100644 index 0000000..54f5240 --- /dev/null +++ b/pyignite/connection/protocol_context.py @@ -0,0 +1,100 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Tuple + +from pyignite.connection.bitmask_feature import BitmaskFeature + + +class ProtocolContext: + """ + Protocol context. Provides ability to easily check supported supported + protocol features. + """ + + def __init__(self, version: Tuple[int, int, int], features: BitmaskFeature = None): + self._version = version + self._features = features + self._ensure_consistency() + + def __hash__(self): + return hash((self._version, self._features)) + + def __eq__(self, other): + return isinstance(other, ProtocolContext) and \ + self.version == other.version and \ + self.features == other.features + + def _ensure_consistency(self): + if not self.is_feature_flags_supported(): + self._features = None + + @property + def version(self): + return getattr(self, '_version', None) + + @version.setter + def version(self, version: Tuple[int, int, int]): + """ + Set version. + + This call may result in features being reset to None if the protocol + version does not support feature masks. + + :param version: Version to set. + """ + setattr(self, '_version', version) + self._ensure_consistency() + + @property + def features(self): + return getattr(self, '_features', None) + + @features.setter + def features(self, features: BitmaskFeature): + """ + Try and set new feature set. + + If features are not supported by the protocol, None is set as features + instead. + + :param features: Features to set. + """ + setattr(self, '_features', features) + self._ensure_consistency() + + def is_partition_awareness_supported(self) -> bool: + """ + Check whether partition awareness supported by the current protocol. + """ + return self.version >= (1, 4, 0) + + def is_status_flags_supported(self) -> bool: + """ + Check whether status flags supported by the current protocol. + """ + return self.version >= (1, 4, 0) + + def is_feature_flags_supported(self) -> bool: + """ + Check whether feature flags supported by the current protocol. + """ + return self.version >= (1, 7, 0) + + def is_cluster_api_supported(self) -> bool: + """ + Check whether cluster API supported by the current protocol. + """ + return self.features and BitmaskFeature.CLUSTER_API in self.features diff --git a/pyignite/constants.py b/pyignite/constants.py index 02f7124..c08a3ce 100644 --- a/pyignite/constants.py +++ b/pyignite/constants.py @@ -31,14 +31,17 @@ ] PROTOCOLS = { + (1, 7, 0), + (1, 6, 0), + (1, 5, 0), (1, 4, 0), (1, 3, 0), (1, 2, 0), } PROTOCOL_VERSION_MAJOR = 1 -PROTOCOL_VERSION_MINOR = 4 -PROTOCOL_VERSION_PATCH = 0 +PROTOCOL_VERSION_MINOR = 7 +PROTOCOL_VERSION_PATCH = 1 MAX_LONG = 9223372036854775807 MIN_LONG = -9223372036854775808 diff --git a/pyignite/datatypes/cluster_state.py b/pyignite/datatypes/cluster_state.py new file mode 100644 index 0000000..863a1d2 --- /dev/null +++ b/pyignite/datatypes/cluster_state.py @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import IntEnum + + +class ClusterState(IntEnum): + #: Cluster deactivated. Cache operations aren't allowed. + INACTIVE = 0 + + #: Cluster activated. All cache operations are allowed. + ACTIVE = 1 + + #: Cluster activated. Cache read operation allowed, Cache data change operation + #: aren't allowed. + ACTIVE_READ_ONLY = 2 diff --git a/pyignite/exceptions.py b/pyignite/exceptions.py index 579aa29..215ccd0 100644 --- a/pyignite/exceptions.py +++ b/pyignite/exceptions.py @@ -65,7 +65,7 @@ class ParameterError(Exception): class CacheError(Exception): """ - This exception is raised, whenever any remote Thin client operation + This exception is raised, whenever any remote Thin client cache operation returns an error. """ pass @@ -93,4 +93,20 @@ class SQLError(CacheError): pass +class ClusterError(Exception): + """ + This exception is raised, whenever any remote Thin client cluster operation + returns an error. + """ + pass + + +class NotSupportedByClusterError(Exception): + """ + This exception is raised, whenever cluster is not supported specific + operation probably because it is outdated. + """ + pass + + connection_errors = (IOError, OSError, EOFError) diff --git a/pyignite/queries/op_codes.py b/pyignite/queries/op_codes.py index 7372713..c152f7c 100644 --- a/pyignite/queries/op_codes.py +++ b/pyignite/queries/op_codes.py @@ -61,7 +61,10 @@ OP_QUERY_SQL_FIELDS = 2004 OP_QUERY_SQL_FIELDS_CURSOR_GET_PAGE = 2005 -P_GET_BINARY_TYPE_NAME = 3000 +OP_GET_BINARY_TYPE_NAME = 3000 OP_REGISTER_BINARY_TYPE_NAME = 3001 OP_GET_BINARY_TYPE = 3002 OP_PUT_BINARY_TYPE = 3003 + +OP_CLUSTER_GET_STATE = 5000 +OP_CLUSTER_CHANGE_STATE = 5001 diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py index beea5d9..d9e6aaf 100644 --- a/pyignite/queries/query.py +++ b/pyignite/queries/query.py @@ -124,7 +124,7 @@ def perform( self.from_python(stream, query_params) response_data = conn.request(stream.getbuffer()) - response_struct = self.response_type(protocol_version=conn.protocol_version, + response_struct = self.response_type(protocol_context=conn.protocol_context, following=response_config, **kwargs) with BinaryStream(conn.client, response_data) as stream: @@ -156,7 +156,7 @@ async def perform_async( await self.from_python_async(stream, query_params) data = await conn.request(stream.getbuffer()) - response_struct = self.response_type(protocol_version=conn.protocol_version, + response_struct = self.response_type(protocol_context=conn.protocol_context, following=response_config, **kwargs) with AioBinaryStream(conn.client, data) as stream: diff --git a/pyignite/queries/response.py b/pyignite/queries/response.py index 83a6e6a..6495802 100644 --- a/pyignite/queries/response.py +++ b/pyignite/queries/response.py @@ -19,6 +19,7 @@ from collections import OrderedDict import ctypes +from pyignite.connection.protocol_context import ProtocolContext from pyignite.constants import RHF_TOPOLOGY_CHANGED, RHF_ERROR from pyignite.datatypes import AnyDataObject, Bool, Int, Long, String, StringArray, Struct from pyignite.datatypes.binary import body_struct, enum_struct, schema_struct @@ -29,7 +30,7 @@ @attr.s class Response: following = attr.ib(type=list, factory=list) - protocol_version = attr.ib(type=tuple, factory=tuple) + protocol_context = attr.ib(type=type(ProtocolContext), default=None) _response_header = None _response_class_name = 'Response' @@ -44,7 +45,7 @@ def __build_header(self): ('query_id', ctypes.c_longlong), ] - if self.protocol_version and self.protocol_version >= (1, 4, 0): + if self.protocol_context.is_status_flags_supported(): fields.append(('flags', ctypes.c_short)) else: fields.append(('status_code', ctypes.c_int),) @@ -68,7 +69,7 @@ def __parse_header(self, stream): fields = [] has_error = False - if self.protocol_version and self.protocol_version >= (1, 4, 0): + if self.protocol_context.is_status_flags_supported(): if header.flags & RHF_TOPOLOGY_CHANGED: fields = [ ('affinity_version', ctypes.c_longlong), diff --git a/pyignite/stream/aio_cluster.py b/pyignite/stream/aio_cluster.py new file mode 100644 index 0000000..8a2f98e --- /dev/null +++ b/pyignite/stream/aio_cluster.py @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module contains `AioCluster` that lets you get info and change state of the +whole cluster. +""" +from pyignite import AioClient +from pyignite.api.cluster import cluster_get_state_async, cluster_set_state_async + + +class AioCluster: + """ + Ignite cluster abstraction. Users should never use this class directly, + but construct its instances with + :py:meth:`~pyignite.aio_client.AioClient.get_cluster` method instead. + """ + + def __init__(self, client: 'AioClient'): + self._client = client + + async def get_state(self): + """ + Gets current cluster state. + + :return: Current cluster state. This is one of ClusterState.INACTIVE, + ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. + """ + return await cluster_get_state_async(await self._client.random_node()) + + async def set_state(self, state): + """ + Changes current cluster state to the given. + + Note: Deactivation clears in-memory caches (without persistence) + including the system caches. + + :param state: New cluster state. This is one of ClusterState.INACTIVE, + ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY. + """ + return await cluster_set_state_async(await self._client.random_node(), state) diff --git a/tests/config/ignite-config.xml.jinja2 b/tests/config/ignite-config.xml.jinja2 index 2bf5129..325a581 100644 --- a/tests/config/ignite-config.xml.jinja2 +++ b/tests/config/ignite-config.xml.jinja2 @@ -31,7 +31,7 @@ - {% if use_auth %} + {% if use_persistence %} {% endif %} @@ -51,9 +51,8 @@ {% endif %} - {% if use_ssl %} - + {% endif %} diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py new file mode 100644 index 0000000..e82e238 --- /dev/null +++ b/tests/custom/test_cluster.py @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from pyignite import Client, AioClient +from pyignite.exceptions import CacheError +from tests.util import clear_ignite_work_dir, start_ignite_gen + +from pyignite.datatypes.cluster_state import ClusterState + + +@pytest.fixture(params=['with-persistence', 'without-persistence']) +def with_persistence(request): + yield request.param == 'with-persistence' + + +@pytest.fixture(autouse=True) +def cleanup(): + clear_ignite_work_dir() + yield None + clear_ignite_work_dir() + + +@pytest.fixture(autouse=True) +def server1(with_persistence, cleanup): + yield from start_ignite_gen(idx=1, use_persistence=with_persistence) + + +@pytest.fixture(autouse=True) +def server2(with_persistence, cleanup): + yield from start_ignite_gen(idx=2, use_persistence=with_persistence) + + +def test_cluster_set_active(with_persistence): + key = 42 + val = 42 + start_state = ClusterState.INACTIVE if with_persistence else ClusterState.ACTIVE + + client = Client() + with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): + cluster = client.get_cluster() + assert cluster.get_state() == start_state + + cluster.set_state(ClusterState.ACTIVE) + assert cluster.get_state() == ClusterState.ACTIVE + + cache = client.get_or_create_cache("test_cache") + cache.put(key, val) + assert cache.get(key) == val + + cluster.set_state(ClusterState.ACTIVE_READ_ONLY) + assert cluster.get_state() == ClusterState.ACTIVE_READ_ONLY + + assert cache.get(key) == val + with pytest.raises(CacheError): + cache.put(key, val + 1) + + cluster.set_state(ClusterState.INACTIVE) + assert cluster.get_state() == ClusterState.INACTIVE + + with pytest.raises(CacheError): + cache.get(key) + + with pytest.raises(CacheError): + cache.put(key, val + 1) + + cluster.set_state(ClusterState.ACTIVE) + assert cluster.get_state() == ClusterState.ACTIVE + + cache.put(key, val + 2) + assert cache.get(key) == val + 2 + + +@pytest.mark.asyncio +async def test_cluster_set_active_async(with_persistence): + key = 42 + val = 42 + start_state = ClusterState.INACTIVE if with_persistence else ClusterState.ACTIVE + + client = AioClient() + async with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]): + cluster = client.get_cluster() + assert await cluster.get_state() == start_state + + await cluster.set_state(ClusterState.ACTIVE) + assert await cluster.get_state() == ClusterState.ACTIVE + + cache = await client.get_or_create_cache("test_cache") + await cache.put(key, val) + assert await cache.get(key) == val + + await cluster.set_state(ClusterState.ACTIVE_READ_ONLY) + assert await cluster.get_state() == ClusterState.ACTIVE_READ_ONLY + + assert await cache.get(key) == val + with pytest.raises(CacheError): + await cache.put(key, val + 1) + + await cluster.set_state(ClusterState.INACTIVE) + assert await cluster.get_state() == ClusterState.INACTIVE + + with pytest.raises(CacheError): + await cache.get(key) + + with pytest.raises(CacheError): + await cache.put(key, val + 1) + + await cluster.set_state(ClusterState.ACTIVE) + assert await cluster.get_state() == ClusterState.ACTIVE + + await cache.put(key, val + 2) + assert await cache.get(key) == val + 2 diff --git a/tests/util.py b/tests/util.py index 5651739..af3b70e 100644 --- a/tests/util.py +++ b/tests/util.py @@ -155,7 +155,7 @@ def create_config_file(tpl_name, file_name, **kwargs): f.write(template.render(**kwargs)) -def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False): +def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False, use_persistence=False): clear_logs(idx) runner = get_ignite_runner() @@ -166,8 +166,16 @@ def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False): env["JVM_OPTS"] = "-Djava.net.preferIPv4Stack=true -Xdebug -Xnoagent -Djava.compiler=NONE " \ "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 " - params = {'ignite_instance_idx': str(idx), 'ignite_client_port': 10800 + idx, 'use_ssl': use_ssl, - 'use_auth': use_auth} + if use_auth: + use_persistence = True + + params = { + 'ignite_instance_idx': str(idx), + 'ignite_client_port': 10800 + idx, + 'use_ssl': use_ssl, + 'use_auth': use_auth, + 'use_persistence': use_persistence, + } create_config_file('log4j.xml.jinja2', f'log4j-{idx}.xml', **params) create_config_file('ignite-config.xml.jinja2', f'ignite-config-{idx}.xml', **params) @@ -177,7 +185,7 @@ def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False): srv = subprocess.Popen(ignite_cmd, env=env, cwd=get_test_dir()) - started = wait_for_condition(lambda: check_server_started(idx), timeout=30) + started = wait_for_condition(lambda: check_server_started(idx), timeout=60) if started: return srv @@ -185,8 +193,8 @@ def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False): raise Exception("Failed to start Ignite: timeout while trying to connect") -def start_ignite_gen(idx=1, use_ssl=False, use_auth=False): - srv = start_ignite(idx, use_ssl=use_ssl, use_auth=use_auth) +def start_ignite_gen(idx=1, use_ssl=False, use_auth=False, use_persistence=False): + srv = start_ignite(idx, use_ssl=use_ssl, use_auth=use_auth, use_persistence=use_persistence) try: yield srv finally: