Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ jobs:
env: TOXENV=py39

install: pip install tox
script: tox
script: tox
6 changes: 6 additions & 0 deletions examples/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ def sync_example():


if __name__ == '__main__':
client = Client()
with client.connect('127.0.0.1', 10800):
if not client.protocol_context.is_transactions_supported():
print("'Transactions' API is not supported by cluster. Finishing...")
exit(0)

print("Starting sync example")
sync_example()

Expand Down
35 changes: 34 additions & 1 deletion pyignite/aio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
"""
Initialize client.

For the use of the SSL-related parameters see
https://docs.python.org/3/library/ssl.html#ssl-certificates.

:param compact_footer: (optional) use compact (True, recommended) or
full (False) schema approach when serializing Complex objects.
Default is to use the same approach the server is using (None).
Expand All @@ -73,7 +76,37 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
:param partition_aware: (optional) try to calculate the exact data
placement from the key before to issue the key operation to the
server node, `True` by default,
:param event_listeners: (optional) event listeners.
:param event_listeners: (optional) event listeners,
:param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection)
with node. Default is 10.0 seconds,
:param use_ssl: (optional) set to True if Ignite server uses SSL
on its binary connector. Defaults to use SSL when username
and password has been supplied, not to use SSL otherwise,
:param ssl_version: (optional) SSL version constant from standard
`ssl` module. Defaults to TLS v1.2,
:param ssl_ciphers: (optional) ciphers to use. If not provided,
`ssl` default ciphers are used,
:param ssl_cert_reqs: (optional) determines how the remote side
certificate is treated:

* `ssl.CERT_NONE` − remote certificate is ignored (default),
* `ssl.CERT_OPTIONAL` − remote certificate will be validated,
if provided,
* `ssl.CERT_REQUIRED` − valid remote certificate is required,

:param ssl_keyfile: (optional) a path to SSL key file to identify
local (client) party,
:param ssl_keyfile_password: (optional) password for SSL key file,
can be provided when key file is encrypted to prevent OpenSSL
password prompt,
:param ssl_certfile: (optional) a path to ssl certificate file
to identify local (client) party,
:param ssl_ca_certfile: (optional) a path to a trusted certificate
or a certificate chain. Required to check the validity of the remote
(server-side) certificate,
:param username: (optional) user name to authenticate to Ignite
cluster,
:param password: (optional) password to authenticate to Ignite cluster.
"""
super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
self._registry_mux = asyncio.Lock()
Expand Down
39 changes: 38 additions & 1 deletion pyignite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
"""
Initialize client.

For the use of the SSL-related parameters see
https://docs.python.org/3/library/ssl.html#ssl-certificates.

:param compact_footer: (optional) use compact (True, recommended) or
full (False) schema approach when serializing Complex objects.
Default is to use the same approach the server is using (None).
Expand All @@ -354,7 +357,41 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
:param partition_aware: (optional) try to calculate the exact data
placement from the key before to issue the key operation to the
server node, `True` by default,
:param event_listeners: (optional) event listeners.
:param event_listeners: (optional) event listeners,
:param timeout: (optional) sets timeout (in seconds) for each socket
operation including `connect`. 0 means non-blocking mode, which is
virtually guaranteed to fail. Can accept integer or float value.
Default is None (blocking mode),
:param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection)
with node. Default is 10.0 seconds,
:param use_ssl: (optional) set to True if Ignite server uses SSL
on its binary connector. Defaults to use SSL when username
and password has been supplied, not to use SSL otherwise,
:param ssl_version: (optional) SSL version constant from standard
`ssl` module. Defaults to TLS v1.2,
:param ssl_ciphers: (optional) ciphers to use. If not provided,
`ssl` default ciphers are used,
:param ssl_cert_reqs: (optional) determines how the remote side
certificate is treated:

* `ssl.CERT_NONE` − remote certificate is ignored (default),
* `ssl.CERT_OPTIONAL` − remote certificate will be validated,
if provided,
* `ssl.CERT_REQUIRED` − valid remote certificate is required,

:param ssl_keyfile: (optional) a path to SSL key file to identify
local (client) party,
:param ssl_keyfile_password: (optional) password for SSL key file,
can be provided when key file is encrypted to prevent OpenSSL
password prompt,
:param ssl_certfile: (optional) a path to ssl certificate file
to identify local (client) party,
:param ssl_ca_certfile: (optional) a path to a trusted certificate
or a certificate chain. Required to check the validity of the remote
(server-side) certificate,
:param username: (optional) user name to authenticate to Ignite
cluster,
:param password: (optional) password to authenticate to Ignite cluster.
"""
super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)

Expand Down
48 changes: 27 additions & 21 deletions pyignite/connection/aio_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,13 @@ def __init__(self, client: 'AioClient', host: str, port: int, username: str = No
:param client: Ignite client object,
:param host: Ignite server node's host name or IP,
:param port: Ignite server node's port number,
:param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection)
with node. Default is 10.0 seconds,
:param use_ssl: (optional) set to True if Ignite server uses SSL
on its binary connector. Defaults to use SSL when username
and password has been supplied, not to use SSL otherwise,
:param ssl_version: (optional) SSL version constant from standard
`ssl` module. Defaults to TLS v1.1, as in Ignite 2.5,
`ssl` module. Defaults to TLS v1.2,
:param ssl_ciphers: (optional) ciphers to use. If not provided,
`ssl` default ciphers are used,
:param ssl_cert_reqs: (optional) determines how the remote side
Expand Down Expand Up @@ -165,7 +167,6 @@ async def connect(self):
"""
if self.alive:
return
self._closed = False
await self._connect()

async def _connect(self):
Expand All @@ -176,27 +177,28 @@ async def _connect(self):
detecting_protocol = True
self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())

try:
self._on_handshake_start()
result = await self._connect_version()
except HandshakeError as e:
if e.expected_version in PROTOCOLS:
self.client.protocol_context.version = e.expected_version
while True:
try:
self._on_handshake_start()
result = await self._connect_version()
else:
self._on_handshake_success(result)
return
except HandshakeError as e:
if e.expected_version in PROTOCOLS:
self.client.protocol_context.version = e.expected_version
continue
else:
self._on_handshake_fail(e)
raise e
except AuthenticationError as e:
self._on_handshake_fail(e)
raise e
except AuthenticationError as e:
self._on_handshake_fail(e)
raise e
except Exception as e:
self._on_handshake_fail(e)
# restore undefined protocol version
if detecting_protocol:
self.client.protocol_context = None
raise e

self._on_handshake_success(result)
except Exception as e:
self._on_handshake_fail(e)
# restore undefined protocol version
if detecting_protocol:
self.client.protocol_context = None
raise e

def process_connection_lost(self, err, reconnect=False):
self.failed = True
Expand Down Expand Up @@ -225,9 +227,13 @@ async def _connect_version(self) -> Union[dict, OrderedDict]:

ssl_context = create_ssl_context(self.ssl_params)
handshake_fut = self._loop.create_future()
self._closed = False
self._transport, _ = await self._loop.create_connection(lambda: BaseProtocol(self, handshake_fut),
host=self.host, port=self.port, ssl=ssl_context)
hs_response = await handshake_fut
try:
hs_response = await asyncio.wait_for(handshake_fut, self.handshake_timeout)
except asyncio.TimeoutError:
raise ConnectionError('timed out')

if hs_response.op_code == 0:
await self.close()
Expand Down
59 changes: 34 additions & 25 deletions pyignite/connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import Union

from pyignite.constants import PROTOCOLS, IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER
from pyignite.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError
from pyignite.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError, ParameterError
from .bitmask_feature import BitmaskFeature

from .handshake import HandshakeRequest, HandshakeResponse
Expand All @@ -34,14 +34,18 @@

class BaseConnection:
def __init__(self, client, host: str = None, port: int = None, username: str = None, password: str = None,
**ssl_params):
handshake_timeout: float = 10.0, **ssl_params):
self.client = client
self.handshake_timeout = handshake_timeout
self.host = host if host else IGNITE_DEFAULT_HOST
self.port = port if port else IGNITE_DEFAULT_PORT
self.username = username
self.password = password
self.uuid = None

if handshake_timeout <= 0.0:
raise ParameterError("handshake_timeout should be positive")

check_ssl_params(ssl_params)

if self.username and self.password and 'use_ssl' not in ssl_params:
Expand Down Expand Up @@ -162,8 +166,9 @@ class Connection(BaseConnection):
* binary protocol connector. Encapsulates handshake and failover reconnection.
"""

def __init__(self, client: 'Client', host: str, port: int, timeout: float = None,
username: str = None, password: str = None, **ssl_params):
def __init__(self, client: 'Client', host: str, port: int, username: str = None, password: str = None,
timeout: float = None, handshake_timeout: float = 10.0,
**ssl_params):
"""
Initialize connection.

Expand All @@ -177,11 +182,13 @@ def __init__(self, client: 'Client', host: str, port: int, timeout: float = None
operation including `connect`. 0 means non-blocking mode, which is
virtually guaranteed to fail. Can accept integer or float value.
Default is None (blocking mode),
:param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection)
with node. Default is 10.0.
:param use_ssl: (optional) set to True if Ignite server uses SSL
on its binary connector. Defaults to use SSL when username
and password has been supplied, not to use SSL otherwise,
:param ssl_version: (optional) SSL version constant from standard
`ssl` module. Defaults to TLS v1.1, as in Ignite 2.5,
`ssl` module. Defaults to TLS v1.2,
:param ssl_ciphers: (optional) ciphers to use. If not provided,
`ssl` default ciphers are used,
:param ssl_cert_reqs: (optional) determines how the remote side
Expand All @@ -206,7 +213,7 @@ def __init__(self, client: 'Client', host: str, port: int, timeout: float = None
cluster,
:param password: (optional) password to authenticate to Ignite cluster.
"""
super().__init__(client, host, port, username, password, **ssl_params)
super().__init__(client, host, port, username, password, handshake_timeout, **ssl_params)
self.timeout = timeout
self._socket = None

Expand All @@ -225,27 +232,29 @@ def connect(self):
detecting_protocol = True
self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())

try:
self._on_handshake_start()
result = self._connect_version()
except HandshakeError as e:
if e.expected_version in PROTOCOLS:
self.client.protocol_context.version = e.expected_version
while True:
try:
self._on_handshake_start()
result = self._connect_version()
else:
self._socket.settimeout(self.timeout)
self._on_handshake_success(result)
return
except HandshakeError as e:
if e.expected_version in PROTOCOLS:
self.client.protocol_context.version = e.expected_version
continue
else:
self._on_handshake_fail(e)
raise e
except AuthenticationError as e:
self._on_handshake_fail(e)
raise e
except AuthenticationError as e:
self._on_handshake_fail(e)
raise e
except Exception as e:
self._on_handshake_fail(e)
# restore undefined protocol version
if detecting_protocol:
self.client.protocol_context = None
raise e

self._on_handshake_success(result)
except Exception as e:
self._on_handshake_fail(e)
# restore undefined protocol version
if detecting_protocol:
self.client.protocol_context = None
raise e

def _connect_version(self) -> Union[dict, OrderedDict]:
"""
Expand All @@ -254,7 +263,7 @@ def _connect_version(self) -> Union[dict, OrderedDict]:
"""

self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(self.timeout)
self._socket.settimeout(self.handshake_timeout)
self._socket = wrap(self._socket, self.ssl_params)
self._socket.connect((self.host, self.port))

Expand Down
3 changes: 3 additions & 0 deletions pyignite/connection/protocol_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def __eq__(self, other):
def __str__(self):
return f'ProtocolContext(version={self._version}, features={self._features})'

def __repr__(self):
return self.__str__()

def _ensure_consistency(self):
if not self.is_feature_flags_supported():
self._features = None
Expand Down
2 changes: 1 addition & 1 deletion pyignite/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


def _validate_int_enum_param(value: Union[int, IntEnum], cls: Type[IntEnum]):
if value not in cls:
if value not in set(v.value for v in cls): # Use this trick to disable warning on python 3.7
raise ValueError(f'{value} not in {cls}')
return value

Expand Down
18 changes: 9 additions & 9 deletions tests/common/test_query_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pyignite import Client, AioClient
from pyignite.exceptions import CacheError
from pyignite.monitoring import QueryEventListener, QueryStartEvent, QueryFailEvent, QuerySuccessEvent
from pyignite.queries.op_codes import OP_CACHE_PUT, OP_CACHE_PARTITIONS, OP_CLUSTER_GET_STATE
from pyignite.queries.op_codes import OP_CACHE_PUT, OP_CACHE_PARTITIONS, OP_CACHE_GET_NAMES

events = []

Expand Down Expand Up @@ -93,17 +93,17 @@ def __assert_fail_events(client):
assert ev.port == conn.port
assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
assert 'Cache does not exist' in ev.err_msg
assert ev.duration > 0
assert ev.duration >= 0


def test_query_success_events(client):
client.get_cluster().get_state()
client.get_cache_names()
__assert_success_events(client)


@pytest.mark.asyncio
async def test_query_success_events_async(async_client):
await async_client.get_cluster().get_state()
await async_client.get_cache_names()
__assert_success_events(async_client)


Expand All @@ -112,16 +112,16 @@ def __assert_success_events(client):
conn = client._nodes[0]
for ev in events:
if isinstance(ev, QueryStartEvent):
assert ev.op_code == OP_CLUSTER_GET_STATE
assert ev.op_name == 'OP_CLUSTER_GET_STATE'
assert ev.op_code == OP_CACHE_GET_NAMES
assert ev.op_name == 'OP_CACHE_GET_NAMES'
assert ev.host == conn.host
assert ev.port == conn.port
assert ev.node_uuid == str(conn.uuid if conn.uuid else '')

if isinstance(ev, QuerySuccessEvent):
assert ev.op_code == OP_CLUSTER_GET_STATE
assert ev.op_name == 'OP_CLUSTER_GET_STATE'
assert ev.op_code == OP_CACHE_GET_NAMES
assert ev.op_name == 'OP_CACHE_GET_NAMES'
assert ev.host == conn.host
assert ev.port == conn.port
assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
assert ev.duration > 0
assert ev.duration >= 0
Loading