Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 0 additions & 48 deletions .travis.yml

This file was deleted.

25 changes: 19 additions & 6 deletions pyignite/connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

from pyignite.constants import *
from pyignite.exceptions import (
HandshakeError, ParameterError, SocketError, connection_errors,
HandshakeError, ParameterError, SocketError, connection_errors, AuthenticationError,
)
from pyignite.datatypes import Byte, Int, Short, String, UUIDObject
from pyignite.datatypes.internal import Struct
Expand All @@ -43,6 +43,8 @@
from .ssl import wrap
from ..stream import BinaryStream, READ_BACKWARD

CLIENT_STATUS_AUTH_FAILURE = 2000


class Connection:
"""
Expand Down Expand Up @@ -180,7 +182,7 @@ def read_response(self) -> Union[dict, OrderedDict]:
('length', Int),
('op_code', Byte),
])
with BinaryStream(self, self.recv()) as stream:
with BinaryStream(self, self.recv(reconnect=False)) as stream:
start_class = response_start.parse(stream)
start = stream.read_ctype(start_class, direction=READ_BACKWARD)
data = response_start.to_python(start)
Expand All @@ -191,6 +193,7 @@ def read_response(self) -> Union[dict, OrderedDict]:
('version_minor', Short),
('version_patch', Short),
('message', String),
('client_status', Int)
])
elif self.get_protocol_version() >= (1, 4, 0):
response_end = Struct([
Expand Down Expand Up @@ -267,7 +270,7 @@ def _connect_version(

with BinaryStream(self) as stream:
hs_request.from_python(stream)
self.send(stream.getbuffer())
self.send(stream.getbuffer(), reconnect=False)

hs_response = self.read_response()
if hs_response['op_code'] == 0:
Expand All @@ -291,6 +294,8 @@ def _connect_version(
client_patch=protocol_version[2],
**hs_response
)
elif hs_response['client_status'] == CLIENT_STATUS_AUTH_FAILURE:
raise AuthenticationError(error_text)
raise HandshakeError((
hs_response['version_major'],
hs_response['version_minor'],
Expand All @@ -313,12 +318,13 @@ def reconnect(self):
except connection_errors:
pass

def send(self, data: Union[bytes, bytearray, memoryview], flags=None):
def send(self, data: Union[bytes, bytearray, memoryview], flags=None, reconnect=True):
"""
Send data down the socket.

:param data: bytes to send,
:param flags: (optional) OS-specific flags.
:param reconnect: (optional) reconnect on failure, default True.
"""
if self.closed:
raise SocketError('Attempt to use closed connection.')
Expand All @@ -334,7 +340,13 @@ def send(self, data: Union[bytes, bytearray, memoryview], flags=None):
self.reconnect()
raise

def recv(self, flags=None) -> bytearray:
def recv(self, flags=None, reconnect=True) -> bytearray:
"""
Receive data from the socket.

:param flags: (optional) OS-specific flags.
:param reconnect: (optional) reconnect on failure, default True.
"""
def _recv(buffer, num_bytes):
bytes_to_receive = num_bytes
while bytes_to_receive > 0:
Expand All @@ -344,7 +356,8 @@ def _recv(buffer, num_bytes):
raise SocketError('Connection broken.')
except connection_errors:
self.failed = True
self.reconnect()
if reconnect:
self.reconnect()
raise

buffer = buffer[bytes_rcvd:]
Expand Down
2 changes: 1 addition & 1 deletion pyignite/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
PROTOCOL_STRING_ENCODING = 'utf-8'
PROTOCOL_CHAR_ENCODING = 'utf-16le'

SSL_DEFAULT_VERSION = ssl.PROTOCOL_TLSv1_1
SSL_DEFAULT_VERSION = ssl.PROTOCOL_TLSv1_2
SSL_DEFAULT_CIPHERS = ssl._DEFAULT_CIPHERS

FNV1_OFFSET_BASIS = 0x811c9dc5
Expand Down
9 changes: 9 additions & 0 deletions pyignite/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ class ParseError(Exception):
pass


class AuthenticationError(Exception):
"""
This exception is raised on authentication failure.
"""

def __init__(self, message: str):
self.message = message


class HandshakeError(SocketError):
"""
This exception is raised on Ignite binary protocol handshake failure,
Expand Down
2 changes: 1 addition & 1 deletion requirements/install.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# these pip packages are necessary for the pyignite to run

attrs==18.1.0
attrs==20.3.0
2 changes: 1 addition & 1 deletion requirements/setup.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# additional package for integrating pytest in setuptools

pytest-runner==4.2
pytest-runner==5.3.0
8 changes: 4 additions & 4 deletions requirements/tests.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# these packages are used for testing

pytest==3.6.1
pytest-cov==2.5.1
teamcity-messages==1.21
psutil==5.6.5
pytest==6.2.2
pytest-cov==2.11.1
teamcity-messages==1.28
psutil==5.8.0
jinja2==2.11.3
72 changes: 72 additions & 0 deletions tests/affinity/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from pyignite import Client
from pyignite.api import cache_create, cache_destroy
from tests.util import start_ignite_gen


@pytest.fixture(scope='module', autouse=True)
def server1():
yield from start_ignite_gen(1)


@pytest.fixture(scope='module', autouse=True)
def server2():
yield from start_ignite_gen(2)


@pytest.fixture(scope='module', autouse=True)
def server3():
yield from start_ignite_gen(3)


@pytest.fixture
def client():
client = Client(partition_aware=True)

client.connect([('127.0.0.1', 10800 + i) for i in range(1, 4)])

yield client

client.close()


@pytest.fixture
def client_not_connected():
client = Client(partition_aware=True)
yield client
client.close()


@pytest.fixture
def cache(connected_client):
cache_name = 'my_bucket'
conn = connected_client.random_node

cache_create(conn, cache_name)
yield cache_name
cache_destroy(conn, cache_name)


@pytest.fixture(scope='module', autouse=True)
def skip_if_no_affinity(request, server1):
client = Client(partition_aware=True)
client.connect('127.0.0.1', 10801)

if not client.partition_awareness_supported_by_protocol:
pytest.skip(f'skipped {request.node.name}, partition awareness is not supported.')
43 changes: 16 additions & 27 deletions tests/test_affinity.py → tests/affinity/test_affinity.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
from pyignite.datatypes.prop_codes import *


def test_get_node_partitions(client_partition_aware):
def test_get_node_partitions(client):
conn = client.random_node

conn = client_partition_aware.random_node

cache_1 = client_partition_aware.get_or_create_cache('test_cache_1')
cache_2 = client_partition_aware.get_or_create_cache({
cache_1 = client.get_or_create_cache('test_cache_1')
cache_2 = client.get_or_create_cache({
PROP_NAME: 'test_cache_2',
PROP_CACHE_KEY_CONFIGURATION: [
{
Expand All @@ -41,9 +40,9 @@ def test_get_node_partitions(client_partition_aware):
}
],
})
cache_3 = client_partition_aware.get_or_create_cache('test_cache_3')
cache_4 = client_partition_aware.get_or_create_cache('test_cache_4')
cache_5 = client_partition_aware.get_or_create_cache('test_cache_5')
client.get_or_create_cache('test_cache_3')
client.get_or_create_cache('test_cache_4')
client.get_or_create_cache('test_cache_5')

result = cache_get_node_partitions(
conn,
Expand Down Expand Up @@ -115,9 +114,8 @@ def test_get_node_partitions(client_partition_aware):

],
)
def test_affinity(client_partition_aware, key, key_hint):

cache_1 = client_partition_aware.get_or_create_cache({
def test_affinity(client, key, key_hint):
cache_1 = client.get_or_create_cache({
PROP_NAME: 'test_cache_1',
PROP_CACHE_MODE: CacheMode.PARTITIONED,
})
Expand All @@ -126,7 +124,7 @@ def test_affinity(client_partition_aware, key, key_hint):

best_node = cache_1.get_best_node(key, key_hint=key_hint)

for node in filter(lambda n: n.alive, client_partition_aware._nodes):
for node in filter(lambda n: n.alive, client._nodes):
result = cache_local_peek(
node, cache_1.cache_id, key, key_hint=key_hint,
)
Expand All @@ -142,9 +140,8 @@ def test_affinity(client_partition_aware, key, key_hint):
cache_1.destroy()


def test_affinity_for_generic_object(client_partition_aware):

cache_1 = client_partition_aware.get_or_create_cache({
def test_affinity_for_generic_object(client):
cache_1 = client.get_or_create_cache({
PROP_NAME: 'test_cache_1',
PROP_CACHE_MODE: CacheMode.PARTITIONED,
})
Expand All @@ -166,7 +163,7 @@ class KeyClass(

best_node = cache_1.get_best_node(key, key_hint=BinaryObject)

for node in filter(lambda n: n.alive, client_partition_aware._nodes):
for node in filter(lambda n: n.alive, client._nodes):
result = cache_local_peek(
node, cache_1.cache_id, key, key_hint=BinaryObject,
)
Expand All @@ -182,16 +179,8 @@ class KeyClass(
cache_1.destroy()


def test_affinity_for_generic_object_without_type_hints(client_partition_aware):

if not client_partition_aware.partition_awareness_supported_by_protocol:
pytest.skip(
'Best effort affinity is not supported by the protocol {}.'.format(
client_partition_aware.protocol_version
)
)

cache_1 = client_partition_aware.get_or_create_cache({
def test_affinity_for_generic_object_without_type_hints(client):
cache_1 = client.get_or_create_cache({
PROP_NAME: 'test_cache_1',
PROP_CACHE_MODE: CacheMode.PARTITIONED,
})
Expand All @@ -213,7 +202,7 @@ class KeyClass(

best_node = cache_1.get_best_node(key)

for node in filter(lambda n: n.alive, client_partition_aware._nodes):
for node in filter(lambda n: n.alive, client._nodes):
result = cache_local_peek(
node, cache_1.cache_id, key
)
Expand Down
Loading