Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions pyignite/api/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,36 +283,31 @@ def sql_fields(
Performs SQL fields query.

:param conn: connection to Ignite server,
:param cache: name or ID of the cache,
:param cache: name or ID of the cache. If zero, then schema is used.
:param query_str: SQL query string,
:param page_size: cursor page size,
:param query_args: (optional) query arguments. List of values or
(value, type hint) tuples,
:param schema: (optional) schema for the query. Defaults to `PUBLIC`,
:param schema: schema for the query.
:param statement_type: (optional) statement type. Can be:

* StatementType.ALL − any type (default),
* StatementType.SELECT − select,
* StatementType.UPDATE − update.

:param distributed_joins: (optional) distributed joins. Defaults to False,
:param distributed_joins: (optional) distributed joins.
:param local: (optional) pass True if this query should be executed
on local node only. Defaults to False,
on local node only.
:param replicated_only: (optional) whether query contains only
replicated tables or not. Defaults to False,
:param enforce_join_order: (optional) enforce join order. Defaults
to False,
replicated tables or not.
:param enforce_join_order: (optional) enforce join order.
:param collocated: (optional) whether your data is co-located or not.
Defaults to False,
:param lazy: (optional) lazy query execution. Defaults to False,
:param lazy: (optional) lazy query execution.
:param include_field_names: (optional) include field names in result.
Defaults to False,
:param max_rows: (optional) query-wide maximum of rows. Defaults to -1
(all rows),
:param max_rows: (optional) query-wide maximum of rows.
:param timeout: (optional) non-negative timeout value in ms. Zero disables
timeout (default),
timeout.
:param binary: (optional) pass True to keep the value in binary form.
False by default,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
Expand Down
18 changes: 12 additions & 6 deletions pyignite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
BinaryTypeError, CacheError, ReconnectError, SQLError, connection_errors,
)
from .utils import (
capitalize, entity_id, schema_id, process_delimiter,
cache_id, capitalize, entity_id, schema_id, process_delimiter,
status_to_exception, is_iterable,
)
from .binary import GenericObjectMeta
Expand Down Expand Up @@ -513,13 +513,14 @@ def get_cache_names(self) -> list:
return cache_get_names(self.random_node)

def sql(
self, query_str: str, page_size: int = 1024, query_args: Iterable = None,
schema: Union[int, str] = 'PUBLIC',
self, query_str: str, page_size: int = 1024,
query_args: Iterable = None, schema: str = 'PUBLIC',
statement_type: int = 0, distributed_joins: bool = False,
local: bool = False, replicated_only: bool = False,
enforce_join_order: bool = False, collocated: bool = False,
lazy: bool = False, include_field_names: bool = False,
max_rows: int = -1, timeout: int = 0,
cache: Union[int, str, Cache] = None
):
"""
Runs an SQL query and returns its result.
Expand Down Expand Up @@ -553,6 +554,8 @@ def sql(
(all rows),
:param timeout: (optional) non-negative timeout value in ms.
Zero disables timeout (default),
:param cache (optional) Name or ID of the cache to use to infer schema.
If set, 'schema' argument is ignored,
:return: generator with result rows as a lists. If
`include_field_names` was set, the first row will hold field names.
"""
Expand Down Expand Up @@ -580,10 +583,13 @@ def generate_result(value):

conn = self.random_node

schema = self.get_cache(schema)
c_id = cache.cache_id if isinstance(cache, Cache) else cache_id(cache)

if c_id != 0:
schema = None

result = sql_fields(
conn, schema.cache_id, query_str,
page_size, query_args, schema.name,
conn, c_id, query_str, page_size, query_args, schema,
statement_type, distributed_joins, local, replicated_only,
enforce_join_order, collocated, lazy, include_field_names,
max_rows, timeout,
Expand Down
3 changes: 3 additions & 0 deletions pyignite/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ def hashcode(data: Union[str, bytes, bytearray, memoryview]) -> int:


def __hashcode_fallback(data: Union[str, bytes, bytearray, memoryview]) -> int:
if data is None:
return 0

if isinstance(data, str):
"""
For strings we iterate over code point which are of the int type
Expand Down
5 changes: 0 additions & 5 deletions tests/test_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@


def test_sql_read_as_binary(client):

client.get_or_create_cache(scheme_name)
client.sql(drop_query)

# create table
Expand Down Expand Up @@ -92,9 +90,6 @@ def test_sql_read_as_binary(client):


def test_sql_write_as_binary(client):

client.get_or_create_cache(scheme_name)

# configure cache as an SQL table
type_name = table_cache_name

Expand Down
4 changes: 1 addition & 3 deletions tests/test_cache_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ def test_cache_remove(client):


def test_cache_get(client):
client.get_or_create_cache('my_cache')

my_cache = client.get_cache('my_cache')
my_cache = client.get_or_create_cache('my_cache')
assert my_cache.settings[PROP_NAME] == 'my_cache'
my_cache.destroy()

Expand Down
84 changes: 73 additions & 11 deletions tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
sql, sql_cursor_get_page,
cache_get_configuration,
)
from pyignite.datatypes.cache_config import CacheMode
from pyignite.datatypes.prop_codes import *
from pyignite.exceptions import SQLError
from pyignite.utils import entity_id
from pyignite.binary import unwrap_binary


initial_data = [
('John', 'Doe', 5),
('Jane', 'Roe', 4),
Expand Down Expand Up @@ -59,9 +59,10 @@ def test_sql(client):

result = sql_fields(
conn,
'PUBLIC',
0,
create_query,
page_size,
schema='PUBLIC',
include_field_names=True
)
assert result.status == 0, result.message
Expand All @@ -70,9 +71,10 @@ def test_sql(client):
fname, lname, grade = data_line
result = sql_fields(
conn,
'PUBLIC',
0,
insert_query,
page_size,
schema='PUBLIC',
query_args=[i, fname, lname, grade],
include_field_names=True
)
Expand Down Expand Up @@ -108,7 +110,7 @@ def test_sql(client):
assert data.type_id == entity_id(binary_type_name)

# repeat cleanup
result = sql_fields(conn, 'PUBLIC', drop_query, page_size)
result = sql_fields(conn, 0, drop_query, page_size, schema='PUBLIC')
assert result.status == 0


Expand All @@ -121,9 +123,10 @@ def test_sql_fields(client):

result = sql_fields(
conn,
'PUBLIC',
0,
create_query,
page_size,
schema='PUBLIC',
include_field_names=True
)
assert result.status == 0, result.message
Expand All @@ -132,19 +135,21 @@ def test_sql_fields(client):
fname, lname, grade = data_line
result = sql_fields(
conn,
'PUBLIC',
0,
insert_query,
page_size,
schema='PUBLIC',
query_args=[i, fname, lname, grade],
include_field_names=True
)
assert result.status == 0, result.message

result = sql_fields(
conn,
'PUBLIC',
0,
select_query,
page_size,
schema='PUBLIC',
include_field_names=True
)
assert result.status == 0
Expand All @@ -159,7 +164,7 @@ def test_sql_fields(client):
assert result.value['more'] is False

# repeat cleanup
result = sql_fields(conn, 'PUBLIC', drop_query, page_size)
result = sql_fields(conn, 0, drop_query, page_size, schema='PUBLIC')
assert result.status == 0


Expand All @@ -176,7 +181,7 @@ def test_long_multipage_query(client):

client.sql('DROP TABLE LongMultipageQuery IF EXISTS')

client.sql("CREATE TABLE LongMultiPageQuery (%s, %s)" % \
client.sql("CREATE TABLE LongMultiPageQuery (%s, %s)" %
(fields[0] + " INT(11) PRIMARY KEY", ",".join(map(lambda f: f + " INT(11)", fields[1:]))))

for id in range(1, 21):
Expand All @@ -193,6 +198,63 @@ def test_long_multipage_query(client):
client.sql(drop_query)


def test_sql_not_create_cache(client):
def test_sql_not_create_cache_with_schema(client):
with pytest.raises(SQLError, match=r".*Cache does not exist.*"):
client.sql(schema='IS_NOT_EXISTING', query_str='select * from IsNotExisting')
client.sql(schema=None, cache='NOT_EXISTING', query_str='select * from NotExisting')


def test_sql_not_create_cache_with_cache(client):
with pytest.raises(SQLError, match=r".*Failed to set schema.*"):
client.sql(schema='NOT_EXISTING', query_str='select * from NotExisting')


def test_query_with_cache(client):
test_key = 42
test_value = 'Lorem ipsum'

cache_name = test_query_with_cache.__name__.upper()
schema_name = f'{cache_name}_schema'.upper()
table_name = f'{cache_name}_table'.upper()

cache = client.create_cache({
PROP_NAME: cache_name,
PROP_SQL_SCHEMA: schema_name,
PROP_CACHE_MODE: CacheMode.PARTITIONED,
PROP_QUERY_ENTITIES: [
{
'table_name': table_name,
'key_field_name': 'KEY',
'value_field_name': 'VALUE',
'key_type_name': 'java.lang.Long',
'value_type_name': 'java.lang.String',
'query_indexes': [],
'field_name_aliases': [],
'query_fields': [
{
'name': 'KEY',
'type_name': 'java.lang.Long',
'is_key_field': True,
'is_notnull_constraint_field': True,
},
{
'name': 'VALUE',
'type_name': 'java.lang.String',
},
],
},
],
})

cache.put(test_key, test_value)

args_to_check = [
('schema', schema_name),
('cache', cache),
('cache', cache.name),
('cache', cache.cache_id)
]

for param, value in args_to_check:
page = client.sql(f'select value from {table_name}', **{param: value})
received = next(page)[0]
assert test_value == received