Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
db9d5e9
IGNITE-14465 Add the ability to set and get cluster state
isapego Apr 1, 2021
c0fed25
IGNITE-14465: Protocol context introduced
isapego Apr 2, 2021
76aca48
IGNITE-14465: Fix issues
isapego Apr 2, 2021
b147482
IGNITE-14465: Add tests
isapego Apr 2, 2021
eda90a4
IGNITE-14465: Fixes
isapego Apr 2, 2021
57b0db9
IGNITE-14465: Fix async tests
isapego Apr 2, 2021
caaa7ec
IGNITE-14465: Improve tests
isapego Apr 2, 2021
d15a2fb
IGNITE-14465: Fix tests config
isapego Apr 2, 2021
a8b509c
IGNITE-14465: BitmaskFeature(IntEnum -> IntFlag)
isapego Apr 2, 2021
3c26217
IGNITE-14465: Fix for cluster tests
isapego Apr 2, 2021
c375426
IGNITE-14465: Fix tests
isapego Apr 2, 2021
1404bdb
IGNITE-14465: Changed default timeout to None
isapego Apr 2, 2021
942b61c
IGNITE-14465: Fix feature bitmask retrieval
isapego Apr 2, 2021
17ccb31
IGNITE-14465: Made protocol context consistent
isapego Apr 2, 2021
1db8580
IGNITE-14465: Make ProtocolContext hashable
isapego Apr 2, 2021
5e64f24
IGNITE-14465: Review fixes
isapego Apr 2, 2021
5426daf
IGNITE-14465: Debug removed
isapego Apr 2, 2021
525346b
IGNITE-14465: Style fix
isapego Apr 2, 2021
3a9eaa3
IGNITE-14465: Re-factor tests
isapego Apr 2, 2021
3ce0507
IGNITE-14465: Fix
isapego Apr 2, 2021
33a4b2e
IGNITE-14465: Remove unused imports
isapego Apr 2, 2021
80efb03
IGNITE-14465: Fix tests
isapego Apr 2, 2021
8b7530f
IGNITE-14465: Fix ProtocolContext.__eq__
isapego Apr 2, 2021
351fd4a
IGNITE-14465: Fix typo
isapego Apr 2, 2021
dc3c4bf
IGNITE-14465: Re-factoring
isapego Apr 2, 2021
bd5211c
IGNITE-14465: Bitmask to bytes
isapego Apr 2, 2021
63a54e4
IGNITE-14465: Fix
isapego Apr 2, 2021
52c4a6b
IGNITE-14465: Fix
isapego Apr 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pyignite/aio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from itertools import chain
from typing import Iterable, Type, Union, Any, Dict

from .aio_cluster import AioCluster
from .api import cache_get_node_partitions_async
from .api.binary import get_binary_type_async, put_binary_type_async
from .api.cache_config import cache_get_names_async
Expand Down Expand Up @@ -92,7 +93,7 @@ async def _connect(self, nodes):

if not self.partition_aware:
try:
if self.protocol_version is None:
if self.protocol_context is None:
# open connection before adding to the pool
await conn.connect()

Expand Down Expand Up @@ -120,7 +121,7 @@ async def _connect(self, nodes):

await asyncio.gather(*reconnect_coro, return_exceptions=True)

if self.protocol_version is None:
if self.protocol_context is None:
raise ReconnectError('Can not connect.')

async def close(self):
Expand Down Expand Up @@ -460,3 +461,11 @@ def sql(
return AioSqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type,
distributed_joins, local, replicated_only, enforce_join_order, collocated,
lazy, include_field_names, max_rows, timeout)

def get_cluster(self) -> 'AioCluster':
"""
Gets client cluster facade.

:return: AioClient cluster facade.
"""
return AioCluster(self)
56 changes: 56 additions & 0 deletions pyignite/aio_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains `AioCluster` that lets you get info and change state of the
whole cluster asynchronously.
"""
from pyignite.api.cluster import cluster_get_state_async, cluster_set_state_async
from pyignite.exceptions import ClusterError
from pyignite.utils import status_to_exception


class AioCluster:
"""
Ignite cluster abstraction. Users should never use this class directly,
but construct its instances with
:py:meth:`~pyignite.aio_client.AioClient.get_cluster` method instead.
"""

def __init__(self, client: 'AioClient'):
self._client = client

@status_to_exception(ClusterError)
async def get_state(self):
"""
Gets current cluster state.

:return: Current cluster state. This is one of ClusterState.INACTIVE,
ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
"""
return await cluster_get_state_async(await self._client.random_node())

@status_to_exception(ClusterError)
async def set_state(self, state):
"""
Changes current cluster state to the given.

Note: Deactivation clears in-memory caches (without persistence)
including the system caches.

:param state: New cluster state. This is one of ClusterState.INACTIVE,
ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
"""
return await cluster_set_state_async(await self._client.random_node(), state)
106 changes: 106 additions & 0 deletions pyignite/api/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyignite.api import APIResult
from pyignite.connection import AioConnection, Connection
from pyignite.datatypes import Byte
from pyignite.exceptions import NotSupportedByClusterError
from pyignite.queries import Query, query_perform
from pyignite.queries.op_codes import OP_CLUSTER_GET_STATE, OP_CLUSTER_CHANGE_STATE


def cluster_get_state(connection: 'Connection', query_id=None) -> 'APIResult':
"""
Get cluster state.

:param connection: Connection to use,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status and a state
retrieved on success, non-zero status and an error description on failure.
"""
return __cluster_get_state(connection, query_id)


async def cluster_get_state_async(connection: 'AioConnection', query_id=None) -> 'APIResult':
"""
Async version of cluster_get_state
"""
return await __cluster_get_state(connection, query_id)


def __post_process_get_state(result):
if result.status == 0:
result.value = result.value['state']
return result


def __cluster_get_state(connection, query_id):
if not connection.protocol_context.is_cluster_api_supported():
raise NotSupportedByClusterError('Cluster API is not supported by the cluster')

query_struct = Query(OP_CLUSTER_GET_STATE, query_id=query_id)
return query_perform(
query_struct, connection,
response_config=[('state', Byte)],
post_process_fun=__post_process_get_state
)


def cluster_set_state(connection: 'Connection', state: int, query_id=None) -> 'APIResult':
"""
Set cluster state.

:param connection: Connection to use,
:param state: State to set,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status if a value
is written, non-zero status and an error description otherwise.
"""
return __cluster_set_state(connection, state, query_id)


async def cluster_set_state_async(connection: 'AioConnection', state: int, query_id=None) -> 'APIResult':
"""
Async version of cluster_get_state
"""
return await __cluster_set_state(connection, state, query_id)


def __post_process_set_state(result):
if result.status == 0:
result.value = result.value['state']
return result


def __cluster_set_state(connection, state, query_id):
if not connection.protocol_context.is_cluster_api_supported():
raise NotSupportedByClusterError('Cluster API is not supported by the cluster')

query_struct = Query(
OP_CLUSTER_CHANGE_STATE,
[
('state', Byte)
],
query_id=query_id
)
return query_perform(
query_struct, connection,
query_params={
'state': state,
}
)
33 changes: 21 additions & 12 deletions pyignite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from .api import cache_get_node_partitions
from .api.binary import get_binary_type, put_binary_type
from .api.cache_config import cache_get_names
from .cluster import Cluster
from .cursors import SqlFieldsCursor
from .cache import Cache, create_cache, get_cache, get_or_create_cache, BaseCache
from .connection import Connection
Expand Down Expand Up @@ -83,32 +84,32 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = False, *
self._partition_aware = partition_aware
self.affinity_version = (0, 0)
self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)}
self._protocol_version = None
self._protocol_context = None

@property
def protocol_version(self):
def protocol_context(self):
"""
Returns the tuple of major, minor, and revision numbers of the used
thin protocol version, or None, if no connection to the Ignite cluster
was not yet established.
Returns protocol context, or None, if no connection to the Ignite
cluster was not yet established.

This method is not a part of the public API. Unless you wish to
extend the `pyignite` capabilities (with additional testing, logging,
examining connections, et c.) you probably should not use it.
"""
return self._protocol_version
return self._protocol_context

@protocol_version.setter
def protocol_version(self, value):
self._protocol_version = value
@protocol_context.setter
def protocol_context(self, value):
self._protocol_context = value

@property
def partition_aware(self):
return self._partition_aware and self.partition_awareness_supported_by_protocol

@property
def partition_awareness_supported_by_protocol(self):
return self.protocol_version is not None and self.protocol_version >= (1, 4, 0)
return self.protocol_context is not None \
and self.protocol_context.is_partition_awareness_supported()

@property
def compact_footer(self) -> bool:
Expand Down Expand Up @@ -379,7 +380,7 @@ def _connect(self, nodes):
conn = Connection(self, host, port, **self._connection_args)

try:
if self.protocol_version is None or self.partition_aware:
if self.protocol_context is None or self.partition_aware:
# open connection before adding to the pool
conn.connect()

Expand All @@ -396,7 +397,7 @@ def _connect(self, nodes):

self._nodes.append(conn)

if self.protocol_version is None:
if self.protocol_context is None:
raise ReconnectError('Can not connect.')

def close(self):
Expand Down Expand Up @@ -727,3 +728,11 @@ def sql(
return SqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type, distributed_joins,
local, replicated_only, enforce_join_order, collocated, lazy, include_field_names,
max_rows, timeout)

def get_cluster(self) -> 'Cluster':
"""
Gets client cluster facade.

:return: Client cluster facade.
"""
return Cluster(self)
56 changes: 56 additions & 0 deletions pyignite/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains `Cluster` that lets you get info and change state of the
whole cluster.
"""
from pyignite.api.cluster import cluster_get_state, cluster_set_state
from pyignite.exceptions import ClusterError
from pyignite.utils import status_to_exception


class Cluster:
"""
Ignite cluster abstraction. Users should never use this class directly,
but construct its instances with
:py:meth:`~pyignite.client.Client.get_cluster` method instead.
"""

def __init__(self, client: 'Client'):
self._client = client

@status_to_exception(ClusterError)
def get_state(self):
"""
Gets current cluster state.

:return: Current cluster state. This is one of ClusterState.INACTIVE,
ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
"""
return cluster_get_state(self._client.random_node)

@status_to_exception(ClusterError)
def set_state(self, state):
"""
Changes current cluster state to the given.

Note: Deactivation clears in-memory caches (without persistence)
including the system caches.

:param state: New cluster state. This is one of ClusterState.INACTIVE,
ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
"""
return cluster_set_state(self._client.random_node, state)
Loading