From 3b910c376b2f805305e057614c3ddec11d7e1a80 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Thu, 18 Feb 2021 15:47:06 -0800 Subject: [PATCH 1/6] IGNITE-14211 Remove existing cache requirement from SQL API --- pyignite/api/sql.py | 23 +++++------- pyignite/client.py | 23 +++++++++--- tests/test_binary.py | 5 --- tests/test_cache_class.py | 4 +- tests/test_sql.py | 78 +++++++++++++++++++++++++++++++++++---- 5 files changed, 97 insertions(+), 36 deletions(-) diff --git a/pyignite/api/sql.py b/pyignite/api/sql.py index 73cacc6..dc470d1 100644 --- a/pyignite/api/sql.py +++ b/pyignite/api/sql.py @@ -283,36 +283,31 @@ def sql_fields( Performs SQL fields query. :param conn: connection to Ignite server, - :param cache: name or ID of the cache, + :param cache: name or ID of the cache. If zero, then schema is used. :param query_str: SQL query string, :param page_size: cursor page size, :param query_args: (optional) query arguments. List of values or (value, type hint) tuples, - :param schema: (optional) schema for the query. Defaults to `PUBLIC`, + :param schema: schema for the query. :param statement_type: (optional) statement type. Can be: * StatementType.ALL − any type (default), * StatementType.SELECT − select, * StatementType.UPDATE − update. - :param distributed_joins: (optional) distributed joins. Defaults to False, + :param distributed_joins: (optional) distributed joins. :param local: (optional) pass True if this query should be executed - on local node only. Defaults to False, + on local node only. :param replicated_only: (optional) whether query contains only - replicated tables or not. Defaults to False, - :param enforce_join_order: (optional) enforce join order. Defaults - to False, + replicated tables or not. + :param enforce_join_order: (optional) enforce join order. :param collocated: (optional) whether your data is co-located or not. - Defaults to False, - :param lazy: (optional) lazy query execution. Defaults to False, + :param lazy: (optional) lazy query execution. :param include_field_names: (optional) include field names in result. - Defaults to False, - :param max_rows: (optional) query-wide maximum of rows. Defaults to -1 - (all rows), + :param max_rows: (optional) query-wide maximum of rows. :param timeout: (optional) non-negative timeout value in ms. Zero disables - timeout (default), + timeout. :param binary: (optional) pass True to keep the value in binary form. - False by default, :param query_id: (optional) a value generated by client and returned as-is in response.query_id. When the parameter is omitted, a random value is generated, diff --git a/pyignite/client.py b/pyignite/client.py index 77c6373..eed6917 100644 --- a/pyignite/client.py +++ b/pyignite/client.py @@ -513,13 +513,13 @@ def get_cache_names(self) -> list: return cache_get_names(self.random_node) def sql( - self, query_str: str, page_size: int = 1024, query_args: Iterable = None, - schema: Union[int, str] = 'PUBLIC', + self, query_str: str, page_size: int = 1024, + query_args: Iterable = None, schema: str = 'PUBLIC', statement_type: int = 0, distributed_joins: bool = False, local: bool = False, replicated_only: bool = False, enforce_join_order: bool = False, collocated: bool = False, lazy: bool = False, include_field_names: bool = False, - max_rows: int = -1, timeout: int = 0, + max_rows: int = -1, timeout: int = 0, cache: Union[int, str] = None ): """ Runs an SQL query and returns its result. @@ -553,6 +553,8 @@ def sql( (all rows), :param timeout: (optional) non-negative timeout value in ms. Zero disables timeout (default), + :param cache (optional) Name or ID of the cache to use to infer schema. + If set, 'schema' argument is ignored, :return: generator with result rows as a lists. If `include_field_names` was set, the first row will hold field names. """ @@ -580,10 +582,19 @@ def generate_result(value): conn = self.random_node - schema = self.get_cache(schema) + if isinstance(cache, int): + cache_id = cache + elif isinstance(cache, str): + cache_id = self.get_cache(cache).cache_id + else: + cache_id = 0 + + if cache_id != 0: + schema = None + result = sql_fields( - conn, schema.cache_id, query_str, - page_size, query_args, schema.name, + conn, cache_id, query_str, + page_size, query_args, schema, statement_type, distributed_joins, local, replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows, timeout, diff --git a/tests/test_binary.py b/tests/test_binary.py index 45d1d25..5fa2ec4 100644 --- a/tests/test_binary.py +++ b/tests/test_binary.py @@ -63,8 +63,6 @@ def test_sql_read_as_binary(client): - - client.get_or_create_cache(scheme_name) client.sql(drop_query) # create table @@ -92,9 +90,6 @@ def test_sql_read_as_binary(client): def test_sql_write_as_binary(client): - - client.get_or_create_cache(scheme_name) - # configure cache as an SQL table type_name = table_cache_name diff --git a/tests/test_cache_class.py b/tests/test_cache_class.py index 1df0d44..940160a 100644 --- a/tests/test_cache_class.py +++ b/tests/test_cache_class.py @@ -62,9 +62,7 @@ def test_cache_remove(client): def test_cache_get(client): - client.get_or_create_cache('my_cache') - - my_cache = client.get_cache('my_cache') + my_cache = client.get_or_create_cache('my_cache') assert my_cache.settings[PROP_NAME] == 'my_cache' my_cache.destroy() diff --git a/tests/test_sql.py b/tests/test_sql.py index c896afb..60950fb 100644 --- a/tests/test_sql.py +++ b/tests/test_sql.py @@ -20,12 +20,12 @@ sql, sql_cursor_get_page, cache_get_configuration, ) +from pyignite.datatypes.cache_config import CacheMode from pyignite.datatypes.prop_codes import * from pyignite.exceptions import SQLError from pyignite.utils import entity_id from pyignite.binary import unwrap_binary - initial_data = [ ('John', 'Doe', 5), ('Jane', 'Roe', 4), @@ -59,9 +59,10 @@ def test_sql(client): result = sql_fields( conn, - 'PUBLIC', + 0, create_query, page_size, + schema='PUBLIC', include_field_names=True ) assert result.status == 0, result.message @@ -70,9 +71,10 @@ def test_sql(client): fname, lname, grade = data_line result = sql_fields( conn, - 'PUBLIC', + 0, insert_query, page_size, + schema='PUBLIC', query_args=[i, fname, lname, grade], include_field_names=True ) @@ -108,7 +110,7 @@ def test_sql(client): assert data.type_id == entity_id(binary_type_name) # repeat cleanup - result = sql_fields(conn, 'PUBLIC', drop_query, page_size) + result = sql_fields(conn, 0, drop_query, page_size, schema='PUBLIC') assert result.status == 0 @@ -121,9 +123,10 @@ def test_sql_fields(client): result = sql_fields( conn, - 'PUBLIC', + 0, create_query, page_size, + schema='PUBLIC', include_field_names=True ) assert result.status == 0, result.message @@ -132,9 +135,10 @@ def test_sql_fields(client): fname, lname, grade = data_line result = sql_fields( conn, - 'PUBLIC', + 0, insert_query, page_size, + schema='PUBLIC', query_args=[i, fname, lname, grade], include_field_names=True ) @@ -142,9 +146,10 @@ def test_sql_fields(client): result = sql_fields( conn, - 'PUBLIC', + 0, select_query, page_size, + schema='PUBLIC', include_field_names=True ) assert result.status == 0 @@ -159,7 +164,7 @@ def test_sql_fields(client): assert result.value['more'] is False # repeat cleanup - result = sql_fields(conn, 'PUBLIC', drop_query, page_size) + result = sql_fields(conn, 0, drop_query, page_size, schema='PUBLIC') assert result.status == 0 @@ -196,3 +201,60 @@ def test_long_multipage_query(client): def test_sql_not_create_cache(client): with pytest.raises(SQLError, match=r".*Cache does not exist.*"): client.sql(schema='IS_NOT_EXISTING', query_str='select * from IsNotExisting') + + +def test_query_with_cache(client): + test_key = 42 + test_value = 'Lorem ipsum' + + cache_name = test_query_with_cache.__name__.upper() + schema_name = f'{cache_name}_schema'.upper() + table_name = f'{cache_name}_table'.upper() + + cache = client.create_cache({ + PROP_NAME: cache_name, + PROP_SQL_SCHEMA: schema_name, + PROP_CACHE_MODE: CacheMode.PARTITIONED, + PROP_QUERY_ENTITIES: [ + { + 'table_name': table_name, + 'key_field_name': 'KEY', + 'value_field_name': 'VALUE', + 'key_type_name': 'java.lang.Long', + 'value_type_name': 'java.lang.String', + 'query_indexes': [], + 'field_name_aliases': [], + 'query_fields': [ + { + 'name': 'KEY', + 'type_name': 'java.lang.Long', + 'is_key_field': True, + 'is_notnull_constraint_field': True, + }, + { + 'name': 'VALUE', + 'type_name': 'java.lang.String', + }, + ], + }, + ], + }) + + qry = f'select value from {table_name}' + + cache.put(test_key, test_value) + + page = client.sql(qry, schema=schema_name) + received = next(page)[0] + + assert test_value == received + + page = client.sql(qry, cache=cache.name) + received = next(page)[0] + + assert test_value == received + + page = client.sql(qry, cache=cache.cache_id) + received = next(page)[0] + + assert test_value == received From fd662f9440260f81252976460a7a3daa9d450434 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Thu, 18 Feb 2021 16:20:01 -0800 Subject: [PATCH 2/6] IGNITE-14211: Fix test --- tests/test_sql.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/test_sql.py b/tests/test_sql.py index 60950fb..c6e570b 100644 --- a/tests/test_sql.py +++ b/tests/test_sql.py @@ -198,9 +198,14 @@ def test_long_multipage_query(client): client.sql(drop_query) -def test_sql_not_create_cache(client): +def test_sql_not_create_cache_with_schema(client): with pytest.raises(SQLError, match=r".*Cache does not exist.*"): - client.sql(schema='IS_NOT_EXISTING', query_str='select * from IsNotExisting') + client.sql(schema=None, cache='NOT_EXISTING', query_str='select * from NotExisting') + + +def test_sql_not_create_cache_with_cache(client): + with pytest.raises(SQLError, match=r".*Failed to set schema.*"): + client.sql(schema='NOT_EXISTING', query_str='select * from NotExisting') def test_query_with_cache(client): From 5af30e9af9097c23df87acb5d48fefd09e90c143 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 19 Feb 2021 00:31:53 -0800 Subject: [PATCH 3/6] IGNITE-14211: Fix --- pyignite/client.py | 20 ++++++++++---------- tests/test_sql.py | 7 ++++++- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/pyignite/client.py b/pyignite/client.py index eed6917..6db16d9 100644 --- a/pyignite/client.py +++ b/pyignite/client.py @@ -58,7 +58,7 @@ BinaryTypeError, CacheError, ReconnectError, SQLError, connection_errors, ) from .utils import ( - capitalize, entity_id, schema_id, process_delimiter, + cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable, ) from .binary import GenericObjectMeta @@ -519,7 +519,8 @@ def sql( local: bool = False, replicated_only: bool = False, enforce_join_order: bool = False, collocated: bool = False, lazy: bool = False, include_field_names: bool = False, - max_rows: int = -1, timeout: int = 0, cache: Union[int, str] = None + max_rows: int = -1, timeout: int = 0, + cache: Union[int, str, Cache] = None ): """ Runs an SQL query and returns its result. @@ -582,19 +583,18 @@ def generate_result(value): conn = self.random_node - if isinstance(cache, int): - cache_id = cache - elif isinstance(cache, str): - cache_id = self.get_cache(cache).cache_id + if cache is None: + c_id = 0 + elif isinstance(cache, Cache): + c_id = cache.cache_id else: - cache_id = 0 + c_id = cache_id(cache) - if cache_id != 0: + if c_id != 0: schema = None result = sql_fields( - conn, cache_id, query_str, - page_size, query_args, schema, + conn, c_id, query_str, page_size, query_args, schema, statement_type, distributed_joins, local, replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows, timeout, diff --git a/tests/test_sql.py b/tests/test_sql.py index c6e570b..6ddcc71 100644 --- a/tests/test_sql.py +++ b/tests/test_sql.py @@ -181,7 +181,7 @@ def test_long_multipage_query(client): client.sql('DROP TABLE LongMultipageQuery IF EXISTS') - client.sql("CREATE TABLE LongMultiPageQuery (%s, %s)" % \ + client.sql("CREATE TABLE LongMultiPageQuery (%s, %s)" % (fields[0] + " INT(11) PRIMARY KEY", ",".join(map(lambda f: f + " INT(11)", fields[1:])))) for id in range(1, 21): @@ -254,6 +254,11 @@ def test_query_with_cache(client): assert test_value == received + page = client.sql(qry, cache=cache) + received = next(page)[0] + + assert test_value == received + page = client.sql(qry, cache=cache.name) received = next(page)[0] From 21835eb6e99810e13c1b28f4d643d940379d83ee Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 19 Feb 2021 00:39:24 -0800 Subject: [PATCH 4/6] IGNITE-14211: Hashcode return 0 if None is passed --- pyignite/client.py | 4 +--- pyignite/utils.py | 4 +++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pyignite/client.py b/pyignite/client.py index 6db16d9..4da1831 100644 --- a/pyignite/client.py +++ b/pyignite/client.py @@ -583,9 +583,7 @@ def generate_result(value): conn = self.random_node - if cache is None: - c_id = 0 - elif isinstance(cache, Cache): + if isinstance(cache, Cache): c_id = cache.cache_id else: c_id = cache_id(cache) diff --git a/pyignite/utils.py b/pyignite/utils.py index 67f164f..8bd39a9 100644 --- a/pyignite/utils.py +++ b/pyignite/utils.py @@ -105,7 +105,9 @@ def hashcode(data: Union[str, bytes, bytearray, memoryview]) -> int: def __hashcode_fallback(data: Union[str, bytes, bytearray, memoryview]) -> int: - if isinstance(data, str): + if data is None: + return 0 + elif isinstance(data, str): """ For strings we iterate over code point which are of the int type and can take up to 4 bytes and can only be positive. From cafe917684d85dc5ca11cdcff53c07248cff5528 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 19 Feb 2021 00:47:01 -0800 Subject: [PATCH 5/6] IGNITE-14211: review fix --- pyignite/client.py | 5 +---- pyignite/utils.py | 3 ++- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pyignite/client.py b/pyignite/client.py index 4da1831..9416474 100644 --- a/pyignite/client.py +++ b/pyignite/client.py @@ -583,10 +583,7 @@ def generate_result(value): conn = self.random_node - if isinstance(cache, Cache): - c_id = cache.cache_id - else: - c_id = cache_id(cache) + c_id = cache.cache_id if isinstance(cache, Cache) else cache_id(cache) if c_id != 0: schema = None diff --git a/pyignite/utils.py b/pyignite/utils.py index 8bd39a9..f1a7f90 100644 --- a/pyignite/utils.py +++ b/pyignite/utils.py @@ -107,7 +107,8 @@ def hashcode(data: Union[str, bytes, bytearray, memoryview]) -> int: def __hashcode_fallback(data: Union[str, bytes, bytearray, memoryview]) -> int: if data is None: return 0 - elif isinstance(data, str): + + if isinstance(data, str): """ For strings we iterate over code point which are of the int type and can take up to 4 bytes and can only be positive. From 952c83d465cf37b59782eb47808dc263a8213031 Mon Sep 17 00:00:00 2001 From: Igor Sapego Date: Fri, 19 Feb 2021 12:02:00 +0300 Subject: [PATCH 6/6] IGNITE-14211: Re-factoring --- tests/test_sql.py | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/tests/test_sql.py b/tests/test_sql.py index 6ddcc71..f25fedd 100644 --- a/tests/test_sql.py +++ b/tests/test_sql.py @@ -245,26 +245,16 @@ def test_query_with_cache(client): ], }) - qry = f'select value from {table_name}' - cache.put(test_key, test_value) - page = client.sql(qry, schema=schema_name) - received = next(page)[0] - - assert test_value == received - - page = client.sql(qry, cache=cache) - received = next(page)[0] - - assert test_value == received - - page = client.sql(qry, cache=cache.name) - received = next(page)[0] - - assert test_value == received - - page = client.sql(qry, cache=cache.cache_id) - received = next(page)[0] + args_to_check = [ + ('schema', schema_name), + ('cache', cache), + ('cache', cache.name), + ('cache', cache.cache_id) + ] - assert test_value == received + for param, value in args_to_check: + page = client.sql(f'select value from {table_name}', **{param: value}) + received = next(page)[0] + assert test_value == received