diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 4f8c2e9905eb..5123e3574dd3 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -1763,12 +1763,12 @@ Possible values: DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"( Changes the behaviour of object storage cluster function or table. -ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table. +ClickHouse applies this setting when the query contains the product of object storage cluster function or table, i.e. when the query for a object storage cluster function or table contains a non-GLOBAL subquery for the object storage cluster function or table. Restrictions: - Only applied for JOIN subqueries. -- Only if the FROM section uses a object storage cluster function ot table. +- Only if the FROM section uses a object storage cluster function or table. Possible values: diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 5f0a92e449b5..80b287723f7e 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -46,6 +46,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."}, {"allow_retries_in_cluster_requests", false, false, "New setting"}, {"object_storage_remote_initiator", false, false, "New setting."}, + {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, + {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, {"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."}, {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."} }); diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index d4472e339edf..9be08214e1a7 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -163,7 +163,7 @@ enum class DistributedProductMode : uint8_t DECLARE_SETTING_ENUM(DistributedProductMode) -/// The setting for executing object storage cluster function ot table JOIN sections. +/// The setting for executing object storage cluster function or table JOIN sections. enum class ObjectStorageClusterJoinMode : uint8_t { LOCAL, /// Convert to local query diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index eed732ef6728..568865c881cf 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -1370,7 +1370,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres /// Overall, IStorage::read -> FetchColumns returns normal column names (except Distributed, which is inconsistent) /// Interpreter::getQueryPlan -> FetchColumns returns identifiers (why?) and this the reason for the bug ^ in Distributed /// Hopefully there is no other case when we read from Distributed up to FetchColumns. - if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns) + if (table_node && table_node->getStorage()->isRemote()) updated_actions_dag_outputs.push_back(output_node); else if (table_function_node && table_function_node->getStorage()->isRemote()) updated_actions_dag_outputs.push_back(output_node); diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 79eca5401a15..7d0c6fb97177 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -31,7 +31,9 @@ #include #include #include +#include #include +#include #include #include @@ -112,7 +114,7 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext; using Base::Base; - explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {} + explicit SearcherVisitor(std::unordered_set types_, ContextPtr context) : Base(context), types(types_) {} bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/) { @@ -126,15 +128,20 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContextgetNodeType(); - if (node_type == type) + if (types.contains(node_type)) + { passed_node = node; + passed_type = node_type; + } } QueryTreeNodePtr getNode() const { return passed_node; } + std::optional getType() const { return passed_type; } private: - QueryTreeNodeType type; + std::unordered_set types; QueryTreeNodePtr passed_node; + std::optional passed_type; }; /* @@ -216,28 +223,44 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( { case ObjectStorageClusterJoinMode::LOCAL: { - auto modified_query_tree = query_tree->clone(); - bool need_modify = false; - - SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); - table_function_searcher.visit(query_tree); - auto table_function_node = table_function_searcher.getNode(); - if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + auto info = getQueryTreeInfo(query_tree, context); - if (has_join) + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) { - auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); - auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context); - auto & table_function_ast = table_function->as(); - query_tree_distributed->setAlias(table_function_ast.alias); + auto modified_query_tree = query_tree->clone(); + + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + left_table_expression_searcher.visit(modified_query_tree); + auto table_function_node = left_table_expression_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + + QueryTreeNodePtr query_tree_distributed; + + auto & query_node = modified_query_tree->as(); + + if (info.has_join) + { + auto join_node = query_node.getJoinTree(); + query_tree_distributed = join_node->as()->getLeftTableExpression()->clone(); + } + else if (info.has_cross_join) + { + SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context); + join_searcher.visit(modified_query_tree); + auto cross_join_node = join_searcher.getNode(); + if (!cross_join_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find CROSS JOIN node"); + // CrossJoinNode contains vector of nodes. 0 is left expression, always exists. + query_tree_distributed = cross_join_node->as()->getTableExpressions()[0]->clone(); + } // Find add used columns from table function to make proper projection list + // Need to do before changing WHERE condition CollectUsedColumnsForSourceVisitor collector(table_function_node, context); - collector.visit(query_tree); + collector.visit(modified_query_tree); const auto & columns = collector.getColumns(); - auto & query_node = modified_query_tree->as(); query_node.resolveProjectionColumns(columns); auto column_nodes_to_select = std::make_shared(); column_nodes_to_select->getNodes().reserve(columns.size()); @@ -245,20 +268,26 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( column_nodes_to_select->getNodes().emplace_back(std::make_shared(column, table_function_node)); query_node.getProjectionNode() = column_nodes_to_select; - // Left only table function to send on cluster nodes - modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + if (info.has_local_columns_in_where) + { + if (query_node.getPrewhere()) + removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getPrewhere(), table_function_node, context); + if (query_node.getWhere()) + removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getWhere(), table_function_node, context); + } - need_modify = true; - } + query_node.getOrderByNode() = std::make_shared(); + query_node.getGroupByNode() = std::make_shared(); - if (has_local_columns_in_where) - { - auto & query_node = modified_query_tree->as(); - query_node.getWhere() = {}; - } + if (query_tree_distributed) + { + // Left only table function to send on cluster nodes + modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + } - if (need_modify) query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); + } + return; } case ObjectStorageClusterJoinMode::GLOBAL: @@ -492,38 +521,59 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const pipeline.init(std::move(pipe)); } -QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( - ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const +IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePtr query_tree, ContextPtr context) { - auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; + QueryTreeInfo info; - if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW) - { - if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) - throw Exception(ErrorCodes::NOT_IMPLEMENTED, - "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); + SearcherVisitor join_searcher({QueryTreeNodeType::JOIN, QueryTreeNodeType::CROSS_JOIN}, context); + join_searcher.visit(query_tree); - SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); - join_searcher.visit(query_info.query_tree); - if (join_searcher.getNode()) - has_join = true; + if (join_searcher.getNode()) + { + if (join_searcher.getType() == QueryTreeNodeType::JOIN) + info.has_join = true; + else + info.has_cross_join = true; + } - SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); - table_function_searcher.visit(query_info.query_tree); - auto table_function_node = table_function_searcher.getNode(); - if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + left_table_expression_searcher.visit(query_tree); + auto table_function_node = left_table_expression_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node"); + auto & query_node = query_tree->as(); + if (query_node.hasWhere() || query_node.hasPrewhere()) + { CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); - auto & query_node = query_info.query_tree->as(); + if (query_node.hasPrewhere()) + collector_where.visit(query_node.getPrewhere()); if (query_node.hasWhere()) collector_where.visit(query_node.getWhere()); - // Can't use 'WHERE' on remote node if it contains columns from other sources + // SELECT x FROM datalake.table WHERE x IN local.table. + // Need to modify 'WHERE' on remote node if it contains columns from other sources + // because remote node might not have those sources. if (!collector_where.getColumns().empty()) - has_local_columns_in_where = true; + info.has_local_columns_in_where = true; + } + + return info; +} + +QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( + ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const +{ + auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; + + if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW) + { + if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); - if (has_join || has_local_columns_in_where) + auto info = getQueryTreeInfo(query_info.query_tree, context); + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) return QueryProcessingStage::Enum::FetchColumns; } diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 3f328d699a2d..b9f739ccad34 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -105,8 +105,14 @@ class IStorageCluster : public IStorage LoggerPtr log; String cluster_name; - mutable bool has_join = false; - mutable bool has_local_columns_in_where = false; + struct QueryTreeInfo + { + bool has_join = false; + bool has_cross_join = false; + bool has_local_columns_in_where = false; + }; + + static QueryTreeInfo getQueryTreeInfo(QueryTreeNodePtr query_tree, ContextPtr context); }; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 77d2c25ab9a5..a07866e68cc6 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -364,7 +364,7 @@ Chunk StorageObjectStorageSource::generate() { chunk.addColumn(constant_column.first, constant_column.second.name_and_type.type->createColumnConst( - chunk.getNumRows(), constant_column.second.value)); + chunk.getNumRows(), constant_column.second.value)->convertToFullColumnIfConst()); } #if USE_PARQUET && USE_AWS_S3 diff --git a/src/Storages/extractTableFunctionFromSelectQuery.cpp b/src/Storages/extractTableFunctionFromSelectQuery.cpp index 8477798b62b1..064f538eeae7 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionFromSelectQuery.cpp @@ -26,6 +26,12 @@ ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query) return table_expression ? table_expression->table_function : nullptr; } +ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query) +{ + auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query); + return table_expression ? table_expression->database_and_table_name : nullptr; +} + ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) { auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query); diff --git a/src/Storages/extractTableFunctionFromSelectQuery.h b/src/Storages/extractTableFunctionFromSelectQuery.h index 9834f3dc7573..2a845477df82 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.h +++ b/src/Storages/extractTableFunctionFromSelectQuery.h @@ -10,6 +10,7 @@ struct ASTTableExpression; ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query); ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query); +ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query); ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); diff --git a/tests/integration/test_database_iceberg/configs/cluster.xml b/tests/integration/test_database_iceberg/configs/cluster.xml new file mode 100644 index 000000000000..b9638e40bc1e --- /dev/null +++ b/tests/integration/test_database_iceberg/configs/cluster.xml @@ -0,0 +1,12 @@ + + + + + + node1 + 9000 + + + + + \ No newline at end of file diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 8402af573265..74279463220b 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -14,12 +14,13 @@ import pytz from minio import Minio from pyiceberg.catalog import load_catalog -from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.partitioning import PartitionField, PartitionSpec, UNPARTITIONED_PARTITION_SPEC from pyiceberg.schema import Schema from pyiceberg.table.sorting import SortField, SortOrder from pyiceberg.transforms import DayTransform, IdentityTransform from pyiceberg.types import ( DoubleType, + LongType, FloatType, NestedField, StringType, @@ -27,6 +28,7 @@ TimestampType, TimestamptzType ) +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm from helpers.config_cluster import minio_secret_key, minio_access_key @@ -186,10 +188,11 @@ def started_cluster(): cluster = ClickHouseCluster(__file__) cluster.add_instance( "node1", - main_configs=["configs/backups.xml"], + main_configs=["configs/backups.xml", "configs/cluster.xml"], user_configs=[], stay_alive=True, with_iceberg_catalog=True, + with_zookeeper=True, ) logging.info("Starting cluster...") @@ -655,3 +658,140 @@ def test_table_with_slash(started_cluster): create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) node.query(f"INSERT INTO {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}` VALUES (NULL, 'AAPL', 193.24, 193.31, tuple('bot'));", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1}) assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}`") == "\\N\tAAPL\t193.24\t193.31\t('bot')\n" + + +def test_cluster_joins(started_cluster): + node = started_cluster.instances["node1"] + + test_ref = f"test_join_tables_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + table_name_2 = f"{test_ref}_table_2" + table_name_local = f"{test_ref}_table_local" + + root_namespace = f"{test_ref}_namespace" + + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(root_namespace) + + schema = Schema( + NestedField( + field_id=1, + name="tag", + field_type=LongType(), + required=False + ), + NestedField( + field_id=2, + name="name", + field_type=StringType(), + required=False, + ), + ) + table = create_table(catalog, root_namespace, table_name, schema, + partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER) + data = [{"tag": 1, "name": "John"}, {"tag": 2, "name": "Jack"}] + df = pa.Table.from_pylist(data) + table.append(df) + + schema2 = Schema( + NestedField( + field_id=1, + name="id", + field_type=LongType(), + required=False + ), + NestedField( + field_id=2, + name="second_name", + field_type=StringType(), + required=False, + ), + ) + table2 = create_table(catalog, root_namespace, table_name_2, schema2, + partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER) + data = [{"id": 1, "second_name": "Dow"}, {"id": 2, "second_name": "Sparrow"}] + df = pa.Table.from_pylist(data) + table2.append(df) + + node.query(f"CREATE TABLE `{table_name_local}` (id Int64, second_name String) ENGINE = Memory()") + node.query(f"INSERT INTO `{table_name_local}` VALUES (1, 'Silver'), (2, 'Black')") + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + res = node.query( + f""" + SELECT t1.name,t2.second_name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 + JOIN {CATALOG_NAME}.`{root_namespace}.{table_name_2}` AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\tSparrow\nJohn\tDow\n" + + res = node.query( + f""" + SELECT name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + WHERE tag in ( + SELECT id + FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\nJohn\n" + + res = node.query( + f""" + SELECT t1.name,t2.second_name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 + JOIN `{table_name_local}` AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\tBlack\nJohn\tSilver\n" + + res = node.query( + f""" + SELECT name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + WHERE tag in ( + SELECT id + FROM `{table_name_local}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\nJohn\n" + + res = node.query( + f""" + SELECT t1.name,t2.second_name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 + CROSS JOIN `{table_name_local}` AS t2 + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\tBlack\nJack\tSilver\nJohn\tBlack\nJohn\tSilver\n" diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index a4a306f0d34c..e8d73c24dcbb 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1163,6 +1163,20 @@ def test_joins(started_cluster): res = list(map(str.split, result5.splitlines())) assert len(res) == 6 + result6 = node.query( + f""" + SELECT name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + WHERE value IN (SELECT id FROM join_table) + ORDER BY name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + res = list(map(str.split, result6.splitlines())) + assert len(res) == 25 + def test_graceful_shutdown(started_cluster): node = started_cluster.instances["s0_0_0"] diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 25d6f8f66f10..c9c8ad58eff1 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3508,6 +3508,151 @@ def compare_selects(query): compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE ((tag + length(name_old)) % 2 = 1) ORDER BY ALL") +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +def test_cluster_joins(started_cluster, storage_type): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = "test_cluster_joins_" + storage_type + "_" + get_uuid_str() + TABLE_NAME_2 = "test_cluster_joins_2_" + storage_type + "_" + get_uuid_str() + TABLE_NAME_LOCAL = "test_cluster_joins_local_" + storage_type + "_" + get_uuid_str() + + def execute_spark_query(query: str, table_name): + return execute_spark_query_general( + spark, + started_cluster, + storage_type, + table_name, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + tag INT, + name VARCHAR(50) + ) + USING iceberg + OPTIONS('format-version'='2') + """, TABLE_NAME + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, 'john'), + (2, 'jack') + """, TABLE_NAME + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME_2} ( + id INT, + second_name VARCHAR(50) + ) + USING iceberg + OPTIONS('format-version'='2') + """, TABLE_NAME_2 + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME_2} VALUES + (1, 'dow'), + (2, 'sparrow') + """, TABLE_NAME_2 + ) + + creation_expression = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=True + ) + + creation_expression_2 = get_creation_expression( + storage_type, TABLE_NAME_2, started_cluster, table_function=True, run_on_cluster=True + ) + + instance.query(f"CREATE TABLE `{TABLE_NAME_LOCAL}` (id Int64, second_name String) ENGINE = Memory()") + instance.query(f"INSERT INTO `{TABLE_NAME_LOCAL}` VALUES (1, 'silver'), (2, 'black')") + + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + JOIN {creation_expression_2} AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\tsparrow\njohn\tdow\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag in ( + SELECT id + FROM {creation_expression_2} + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\njohn\n" + + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + JOIN `{TABLE_NAME_LOCAL}` AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\tblack\njohn\tsilver\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag in ( + SELECT id + FROM `{TABLE_NAME_LOCAL}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\njohn\n" + + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + CROSS JOIN `{TABLE_NAME_LOCAL}` AS t2 + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\tblack\njack\tsilver\njohn\tblack\njohn\tsilver\n" + + @pytest.mark.parametrize("storage_type", ["s3"]) def test_system_tables_partition_sorting_keys(started_cluster, storage_type): instance = started_cluster.instances["node1"]