Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/modules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ of `pyignite`, intended for end users.
datatypes/parsers
datatypes/cache_props
Exceptions <source/pyignite.exceptions>
Monitoring and handling events <source/pyignite.monitoring>
20 changes: 20 additions & 0 deletions docs/source/pyignite.connection.protocol_context.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
.. Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

pyignite.connection.protocol_context package
===========================

.. automodule:: pyignite.connection.protocol_context
:members:
6 changes: 6 additions & 0 deletions docs/source/pyignite.connection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@ pyignite.connection package
:members:
:undoc-members:
:show-inheritance:

Submodules
----------

.. toctree::
pyignite.connection.protocol_context
21 changes: 21 additions & 0 deletions docs/source/pyignite.monitoring.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
.. Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

pyignite.monitoring module
===========================

.. automodule:: pyignite.monitoring
:members:
:member-order: bysource
1 change: 1 addition & 0 deletions docs/source/pyignite.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ Submodules
pyignite.transaction
pyignite.cursors
pyignite.exceptions
pyignite.monitoring

17 changes: 9 additions & 8 deletions pyignite/aio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import random
import sys
from itertools import chain
from typing import Iterable, Type, Union, Any, Dict, Optional
from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence

from .aio_cluster import AioCluster
from .api import cache_get_node_partitions_async
Expand Down Expand Up @@ -60,7 +60,8 @@ class AioClient(BaseClient):
Asynchronous Client implementation.
"""

def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **kwargs):
def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
event_listeners: Optional[Sequence] = None, **kwargs):
"""
Initialize client.

Expand All @@ -71,9 +72,10 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **
https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema
:param partition_aware: (optional) try to calculate the exact data
placement from the key before to issue the key operation to the
server node, `True` by default.
server node, `True` by default,
:param event_listeners: (optional) event listeners.
"""
super().__init__(compact_footer, partition_aware, **kwargs)
super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
self._registry_mux = asyncio.Lock()
self._affinity_query_mux = asyncio.Lock()

Expand All @@ -99,9 +101,8 @@ async def _connect(self, nodes):

# do not try to open more nodes
self._current_node = i

except connection_errors:
conn.failed = True
pass

self._nodes.append(conn)

Expand Down Expand Up @@ -301,7 +302,7 @@ async def _get_affinity(self, conn: 'AioConnection', caches: Iterable[int]) -> D
"""
for _ in range(AFFINITY_RETRIES or 1):
result = await cache_get_node_partitions_async(conn, caches)
if result.status == 0 and result.value['partition_mapping']:
if result.status == 0:
break
await asyncio.sleep(AFFINITY_DELAY)

Expand Down Expand Up @@ -341,7 +342,7 @@ async def get_best_node(

asyncio.ensure_future(
asyncio.gather(
*[conn.reconnect() for conn in self._nodes if not conn.alive],
*[node.reconnect() for node in self._nodes if not node.alive],
return_exceptions=True
)
)
Expand Down
24 changes: 14 additions & 10 deletions pyignite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import random
import re
from itertools import chain
from typing import Iterable, Type, Union, Any, Dict, Optional
from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence

from .api import cache_get_node_partitions
from .api.binary import get_binary_type, put_binary_type
Expand All @@ -66,6 +66,7 @@
get_field_by_id, unsigned
)
from .binary import GenericObjectMeta
from .monitoring import _EventListeners


__all__ = ['Client']
Expand All @@ -76,7 +77,8 @@ class BaseClient:
_identifier = re.compile(r'[^0-9a-zA-Z_.+$]', re.UNICODE)
_ident_start = re.compile(r'^[^a-zA-Z_]+', re.UNICODE)

def __init__(self, compact_footer: bool = None, partition_aware: bool = False, **kwargs):
def __init__(self, compact_footer: bool = None, partition_aware: bool = False,
event_listeners: Optional[Sequence] = None, **kwargs):
self._compact_footer = compact_footer
self._partition_aware = partition_aware
self._connection_args = kwargs
Expand All @@ -87,6 +89,7 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = False, *
self.affinity_version = (0, 0)
self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)}
self._protocol_context = None
self._event_listeners = _EventListeners(event_listeners)

@property
def protocol_context(self):
Expand Down Expand Up @@ -338,7 +341,8 @@ class Client(BaseClient):
Synchronous Client implementation.
"""

def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **kwargs):
def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
event_listeners: Optional[Sequence] = None, **kwargs):
"""
Initialize client.

Expand All @@ -349,9 +353,10 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **
https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema
:param partition_aware: (optional) try to calculate the exact data
placement from the key before to issue the key operation to the
server node, `True` by default.
server node, `True` by default,
:param event_listeners: (optional) event listeners.
"""
super().__init__(compact_footer, partition_aware, **kwargs)
super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)

def connect(self, *args):
"""
Expand Down Expand Up @@ -382,7 +387,6 @@ def _connect(self, nodes):
self._current_node = i

except connection_errors:
conn.failed = True
if self.partition_aware:
# schedule the reconnection
conn.reconnect()
Expand Down Expand Up @@ -565,7 +569,7 @@ def _get_affinity(self, conn: 'Connection', caches: Iterable[int]) -> Dict:
"""
for _ in range(AFFINITY_RETRIES or 1):
result = cache_get_node_partitions(conn, caches)
if result.status == 0 and result.value['partition_mapping']:
if result.status == 0:
break
time.sleep(AFFINITY_DELAY)

Expand Down Expand Up @@ -608,9 +612,9 @@ def get_best_node(

self._update_affinity(full_affinity)

for conn in self._nodes:
if not conn.alive:
conn.reconnect()
for node in self._nodes:
if not node.alive:
node.reconnect()

c_id = cache.cache_id if isinstance(cache, BaseCache) else cache_id(cache)
parts = self._cache_partition_mapping(c_id).get('number_of_partitions')
Expand Down
2 changes: 1 addition & 1 deletion pyignite/connection/aio_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ async def _connect(self):
self._on_handshake_fail(e)
raise e
except Exception as e:
self._on_handshake_fail(e)
# restore undefined protocol version
if detecting_protocol:
self.client.protocol_context = None
self._on_handshake_fail(e)
raise e

self._on_handshake_success(result)
Expand Down
44 changes: 34 additions & 10 deletions pyignite/connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ def _process_handshake_error(self, response):
def _on_handshake_start(self):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Connecting to node(address=%s, port=%d) with protocol context %s",
self.host, self.port, self.client.protocol_context)
self.host, self.port, self.protocol_context)
if self._enabled_connection_listener:
self._connection_listener.publish_handshake_start(self.host, self.port, self.protocol_context)

def _on_handshake_success(self, result):
features = BitmaskFeature.from_array(result.get('features', None))
Expand All @@ -109,24 +111,45 @@ def _on_handshake_success(self, result):

if logger.isEnabledFor(logging.DEBUG):
logger.debug("Connected to node(address=%s, port=%d, node_uuid=%s) with protocol context %s",
self.host, self.port, self.uuid, self.client.protocol_context)
self.host, self.port, self.uuid, self.protocol_context)
if self._enabled_connection_listener:
self._connection_listener.publish_handshake_success(self.host, self.port, self.protocol_context, self.uuid)

def _on_handshake_fail(self, err):
self.failed = True

if isinstance(err, AuthenticationError):
logger.error("Authentication failed while connecting to node(address=%s, port=%d): %s",
self.host, self.port, err)
if self._enabled_connection_listener:
self._connection_listener.publish_authentication_fail(self.host, self.port, self.protocol_context, err)
else:
logger.error("Failed to perform handshake, connection to node(address=%s, port=%d) "
"with protocol context %s failed: %s",
self.host, self.port, self.client.protocol_context, err, exc_info=True)
self.host, self.port, self.protocol_context, err, exc_info=True)
if self._enabled_connection_listener:
self._connection_listener.publish_handshake_fail(self.host, self.port, self.protocol_context, err)

def _on_connection_lost(self, err=None, expected=False):
if expected and logger.isEnabledFor(logging.DEBUG):
logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
self.host, self.port, self.uuid)
if expected:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
self.host, self.port, self.uuid)
if self._enabled_connection_listener:
self._connection_listener.publish_connection_closed(self.host, self.port, self.uuid)
else:
logger.info("Connection lost to node(address=%s, port=%d, node_uuid=%s): %s",
self.host, self.port, self.uuid, err)
if self._enabled_connection_listener:
self._connection_listener.publish_connection_lost(self.host, self.port, self.uuid, err)

@property
def _enabled_connection_listener(self):
return self.client._event_listeners and self.client._event_listeners.enabled_connection_listener

@property
def _connection_listener(self):
return self.client._event_listeners


class Connection(BaseConnection):
Expand Down Expand Up @@ -216,10 +239,10 @@ def connect(self):
self._on_handshake_fail(e)
raise e
except Exception as e:
self._on_handshake_fail(e)
# restore undefined protocol version
if detecting_protocol:
self.client.protocol_context = None
self._on_handshake_fail(e)
raise e

self._on_handshake_success(result)
Expand Down Expand Up @@ -260,7 +283,7 @@ def reconnect(self):
if self.alive:
return

self.close()
self.close(on_reconnect=True)

# connect and silence the connection errors
try:
Expand Down Expand Up @@ -352,7 +375,7 @@ def recv(self, flags=None, reconnect=True) -> bytearray:

return data

def close(self):
def close(self, on_reconnect=False):
"""
Try to mark socket closed, then unlink it. This is recommended but
not required, since sockets are automatically closed when
Expand All @@ -364,5 +387,6 @@ def close(self):
self._socket.close()
except connection_errors:
pass
self._on_connection_lost(expected=True)
if not on_reconnect and not self.failed:
self._on_connection_lost(expected=True)
self._socket = None
3 changes: 3 additions & 0 deletions pyignite/connection/protocol_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def _ensure_consistency(self):
if not self.is_feature_flags_supported():
self._features = None

def copy(self):
return ProtocolContext(self.version, self.features)

@property
def version(self):
return getattr(self, '_version', None)
Expand Down
Loading