From 38a45e4e46ceb2b2cf04ac9405535f0e181a8f12 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 14 Oct 2025 20:12:33 +0200 Subject: [PATCH 01/15] Join for tables --- src/Planner/PlannerJoinTree.cpp | 2 +- src/Storages/IStorageCluster.cpp | 51 +++++++++++++------ .../extractTableFunctionFromSelectQuery.cpp | 6 +++ .../extractTableFunctionFromSelectQuery.h | 1 + 4 files changed, 43 insertions(+), 17 deletions(-) diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index eed732ef6728..568865c881cf 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -1370,7 +1370,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres /// Overall, IStorage::read -> FetchColumns returns normal column names (except Distributed, which is inconsistent) /// Interpreter::getQueryPlan -> FetchColumns returns identifiers (why?) and this the reason for the bug ^ in Distributed /// Hopefully there is no other case when we read from Distributed up to FetchColumns. - if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns) + if (table_node && table_node->getStorage()->isRemote()) updated_actions_dag_outputs.push_back(output_node); else if (table_function_node && table_function_node->getStorage()->isRemote()) updated_actions_dag_outputs.push_back(output_node); diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 076ec5b5a28b..a507b3d85a19 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -112,7 +113,7 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext; using Base::Base; - explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {} + explicit SearcherVisitor(std::unordered_set types_, ContextPtr context) : Base(context), types(types_) {} bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/) { @@ -126,15 +127,20 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContextgetNodeType(); - if (node_type == type) + if (types.contains(node_type)) + { passed_node = node; + passed_type = node_type; + } } QueryTreeNodePtr getNode() const { return passed_node; } + std::optional getType() const { return passed_type; } private: - QueryTreeNodeType type; + std::unordered_set types; QueryTreeNodePtr passed_node; + std::optional passed_type; }; /* @@ -219,7 +225,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( auto modified_query_tree = query_tree->clone(); bool need_modify = false; - SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); + SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); table_function_searcher.visit(query_tree); auto table_function_node = table_function_searcher.getNode(); if (!table_function_node) @@ -227,17 +233,28 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( if (has_join) { - auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); - auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context); - auto & table_function_ast = table_function->as(); - query_tree_distributed->setAlias(table_function_ast.alias); + QueryTreeNodePtr query_tree_distributed; + + auto & query_node = modified_query_tree->as(); + + if (table_function_searcher.getType().value() == QueryTreeNodeType::TABLE_FUNCTION) + { + auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); + query_tree_distributed = buildTableFunctionQueryTree(table_function, context); + auto & table_function_ast = table_function->as(); + query_tree_distributed->setAlias(table_function_ast.alias); + } + else + { + auto join_node = query_node.getJoinTree(); + query_tree_distributed = join_node->as()->getLeftTableExpression()->clone(); + } // Find add used columns from table function to make proper projection list CollectUsedColumnsForSourceVisitor collector(table_function_node, context); collector.visit(query_tree); const auto & columns = collector.getColumns(); - auto & query_node = modified_query_tree->as(); query_node.resolveProjectionColumns(columns); auto column_nodes_to_select = std::make_shared(); column_nodes_to_select->getNodes().reserve(columns.size()); @@ -501,25 +518,27 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( throw Exception(ErrorCodes::NOT_IMPLEMENTED, "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); - SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); + SearcherVisitor join_searcher({QueryTreeNodeType::JOIN}, context); join_searcher.visit(query_info.query_tree); if (join_searcher.getNode()) has_join = true; - SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); + SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); table_function_searcher.visit(query_info.query_tree); auto table_function_node = table_function_searcher.getNode(); if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node"); - CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); auto & query_node = query_info.query_tree->as(); if (query_node.hasWhere()) + { + CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); collector_where.visit(query_node.getWhere()); - // Can't use 'WHERE' on remote node if it contains columns from other sources - if (!collector_where.getColumns().empty()) - has_local_columns_in_where = true; + // Can't use 'WHERE' on remote node if it contains columns from other sources + if (!collector_where.getColumns().empty()) + has_local_columns_in_where = true; + } if (has_join || has_local_columns_in_where) return QueryProcessingStage::Enum::FetchColumns; diff --git a/src/Storages/extractTableFunctionFromSelectQuery.cpp b/src/Storages/extractTableFunctionFromSelectQuery.cpp index 8477798b62b1..064f538eeae7 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionFromSelectQuery.cpp @@ -26,6 +26,12 @@ ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query) return table_expression ? table_expression->table_function : nullptr; } +ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query) +{ + auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query); + return table_expression ? table_expression->database_and_table_name : nullptr; +} + ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) { auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query); diff --git a/src/Storages/extractTableFunctionFromSelectQuery.h b/src/Storages/extractTableFunctionFromSelectQuery.h index 9834f3dc7573..2a845477df82 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.h +++ b/src/Storages/extractTableFunctionFromSelectQuery.h @@ -10,6 +10,7 @@ struct ASTTableExpression; ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query); ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query); +ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query); ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); From 43825ba9239471fcac2b3123d48db49a162d937e Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 15 Oct 2025 17:21:12 +0200 Subject: [PATCH 02/15] Some fixes --- src/Storages/IStorageCluster.cpp | 80 +++++++++------- tests/integration/test_s3_cluster/test.py | 14 +++ .../integration/test_storage_iceberg/test.py | 91 +++++++++++++++++++ 3 files changed, 152 insertions(+), 33 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index a507b3d85a19..85ca80c5d85a 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include @@ -222,37 +223,40 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( { case ObjectStorageClusterJoinMode::LOCAL: { - auto modified_query_tree = query_tree->clone(); - bool need_modify = false; + if (has_join || has_local_columns_in_where) + { + auto modified_query_tree = query_tree->clone(); - SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); - table_function_searcher.visit(query_tree); - auto table_function_node = table_function_searcher.getNode(); - if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + table_function_searcher.visit(modified_query_tree); + auto table_function_node = table_function_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); - if (has_join) - { QueryTreeNodePtr query_tree_distributed; auto & query_node = modified_query_tree->as(); - if (table_function_searcher.getType().value() == QueryTreeNodeType::TABLE_FUNCTION) - { - auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); - query_tree_distributed = buildTableFunctionQueryTree(table_function, context); - auto & table_function_ast = table_function->as(); - query_tree_distributed->setAlias(table_function_ast.alias); - } - else + if (has_join) { - auto join_node = query_node.getJoinTree(); - query_tree_distributed = join_node->as()->getLeftTableExpression()->clone(); + if (table_function_searcher.getType().value() == QueryTreeNodeType::TABLE_FUNCTION) + { + auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); + query_tree_distributed = buildTableFunctionQueryTree(table_function, context); + auto & table_function_ast = table_function->as(); + query_tree_distributed->setAlias(table_function_ast.alias); + } + else + { + auto join_node = query_node.getJoinTree(); + query_tree_distributed = join_node->as()->getLeftTableExpression()->clone(); + } } // Find add used columns from table function to make proper projection list + // Need to do before changing WHERE condition CollectUsedColumnsForSourceVisitor collector(table_function_node, context); - collector.visit(query_tree); + collector.visit(modified_query_tree); const auto & columns = collector.getColumns(); query_node.resolveProjectionColumns(columns); @@ -262,20 +266,26 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( column_nodes_to_select->getNodes().emplace_back(std::make_shared(column, table_function_node)); query_node.getProjectionNode() = column_nodes_to_select; - // Left only table function to send on cluster nodes - modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + if (has_local_columns_in_where) + { + if (query_node.getPrewhere()) + removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getPrewhere(), table_function_node, context); + if (query_node.getWhere()) + removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getWhere(), table_function_node, context); + } + + query_node.getOrderByNode() = std::make_shared(); + query_node.getGroupByNode() = std::make_shared(); - need_modify = true; - } + if (query_tree_distributed) + { + // Left only table function to send on cluster nodes + modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + } - if (has_local_columns_in_where) - { - auto & query_node = modified_query_tree->as(); - query_node.getWhere() = {}; + query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); } - if (need_modify) - query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); return; } case ObjectStorageClusterJoinMode::GLOBAL: @@ -530,12 +540,16 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node"); auto & query_node = query_info.query_tree->as(); - if (query_node.hasWhere()) + if (query_node.hasWhere() || query_node.hasPrewhere()) { CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); - collector_where.visit(query_node.getWhere()); + if (query_node.hasPrewhere()) + collector_where.visit(query_node.getPrewhere()); + if (query_node.hasWhere()) + collector_where.visit(query_node.getWhere()); - // Can't use 'WHERE' on remote node if it contains columns from other sources + // SELECT x FROM datalake.table WHERE x IN local.table + // Need to modify 'WHERE' on remote node if it contains columns from other sources if (!collector_where.getColumns().empty()) has_local_columns_in_where = true; } diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index a4a306f0d34c..e8d73c24dcbb 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1163,6 +1163,20 @@ def test_joins(started_cluster): res = list(map(str.split, result5.splitlines())) assert len(res) == 6 + result6 = node.query( + f""" + SELECT name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + WHERE value IN (SELECT id FROM join_table) + ORDER BY name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + res = list(map(str.split, result6.splitlines())) + assert len(res) == 25 + def test_graceful_shutdown(started_cluster): node = started_cluster.instances["s0_0_0"] diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index bcce89ac357a..d091512b6a40 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3492,3 +3492,94 @@ def compare_selects(query): compare_selects(f"SELECT _path,* FROM {creation_expression} ORDER BY ALL") compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE name_old='vasily' ORDER BY ALL") compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE ((tag + length(name_old)) % 2 = 1) ORDER BY ALL") + + +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +def test_cluster_joins(started_cluster, storage_type): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = "test_cluster_joins_" + storage_type + "_" + get_uuid_str() + TABLE_NAME_2 = "test_cluster_joins_2_" + storage_type + "_" + get_uuid_str() + + def execute_spark_query(query: str, table_name): + return execute_spark_query_general( + spark, + started_cluster, + storage_type, + table_name, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + tag INT, + name VARCHAR(50) + ) + USING iceberg + OPTIONS('format-version'='2') + """, TABLE_NAME + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, 'john'), + (2, 'jack') + """, TABLE_NAME + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME_2} ( + id INT, + second_name VARCHAR(50) + ) + USING iceberg + OPTIONS('format-version'='2') + """, TABLE_NAME_2 + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME_2} VALUES + (1, 'dow'), + (2, 'sparrow') + """, TABLE_NAME_2 + ) + + creation_expression = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=True + ) + + creation_expression_2 = get_creation_expression( + storage_type, TABLE_NAME_2, started_cluster, table_function=True, run_on_cluster=True + ) + + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + JOIN {creation_expression_2} AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\tsparrow\njohn\tdow\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag in ( + SELECT id + FROM {creation_expression_2} + ) + ORDER BY ALL + SETTINGS object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\njohn\n" From 4e9ac7e6d57279f910f8190776f6e1e1975468a5 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 15 Oct 2025 18:10:10 +0200 Subject: [PATCH 03/15] Add test for tables --- .../integration/test_database_iceberg/test.py | 87 ++++++++++++++++++- 1 file changed, 86 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 6fa8113f5e37..81cc0d67ca68 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -14,12 +14,13 @@ import pytz from minio import Minio from pyiceberg.catalog import load_catalog -from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.partitioning import PartitionField, PartitionSpec, UNPARTITIONED_PARTITION_SPEC from pyiceberg.schema import Schema from pyiceberg.table.sorting import SortField, SortOrder from pyiceberg.transforms import DayTransform, IdentityTransform from pyiceberg.types import ( DoubleType, + LongType, FloatType, NestedField, StringType, @@ -27,6 +28,7 @@ TimestampType, TimestamptzType ) +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm from helpers.config_cluster import minio_secret_key, minio_access_key @@ -609,3 +611,86 @@ def test_table_with_slash(started_cluster): create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) node.query(f"INSERT INTO {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}` VALUES (NULL, 'AAPL', 193.24, 193.31, tuple('bot'));", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1}) assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}`") == "\\N\tAAPL\t193.24\t193.31\t('bot')\n" + + +def test_cluster_joins(started_cluster): + node = started_cluster.instances["node1"] + + test_ref = f"test_join_tables_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + table_name_2 = f"{test_ref}_table_2" + + root_namespace = f"{test_ref}_namespace" + + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(root_namespace) + + schema = Schema( + NestedField( + field_id=1, + name="tag", + field_type=LongType(), + required=False + ), + NestedField( + field_id=2, + name="name", + field_type=StringType(), + required=False, + ), + ) + table = create_table(catalog, root_namespace, table_name, schema, + partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER) + data = [{"tag": 1, "name": "John"}, {"tag": 2, "name": "Jack"}] + df = pa.Table.from_pylist(data) + table.append(df) + + schema2 = Schema( + NestedField( + field_id=1, + name="id", + field_type=LongType(), + required=False + ), + NestedField( + field_id=2, + name="second_name", + field_type=StringType(), + required=False, + ), + ) + table2 = create_table(catalog, root_namespace, table_name_2, schema2, + partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER) + data = [{"id": 1, "second_name": "Dow"}, {"id": 2, "second_name": "Sparrow"}] + df = pa.Table.from_pylist(data) + table2.append(df) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + res = node.query( + f""" + SELECT t1.name,t2.second_name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 + JOIN {CATALOG_NAME}.`{root_namespace}.{table_name_2}` AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\tSparrow\nJohn\tDow\n" + + res = node.query( + f""" + SELECT name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + WHERE tag in ( + SELECT id + FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}` + ) + ORDER BY ALL + SETTINGS object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\nJohn\n" From 3dd9b46b087efa4c5912b81733f8261c0c8a3271 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 16 Oct 2025 11:47:16 +0200 Subject: [PATCH 04/15] Fix stateless tests --- src/Core/SettingsChangesHistory.cpp | 13 ++----------- src/Interpreters/InterpreterInsertQuery.cpp | 3 +++ 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 5915d3f3714d..5f6aa56e1929 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -48,6 +48,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_max_nodes", 0, 0, "New setting"}, {"allow_retries_in_cluster_requests", false, false, "New setting"}, {"object_storage_remote_initiator", false, false, "New setting."}, + {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, + {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.8", { @@ -143,13 +145,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"distributed_plan_force_shuffle_aggregation", 0, 0, "New experimental setting"}, {"allow_experimental_insert_into_iceberg", false, false, "New setting."}, /// RELEASE CLOSED - {"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"}, - {"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"}, - {"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"}, - {"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"}, - {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, - {"object_storage_cluster", "", "", "New setting"}, - {"object_storage_max_nodes", 0, 0, "New setting"}, }); addSettingsChanges(settings_changes_history, "25.6.5.2000", { @@ -157,10 +152,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"}, {"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"}, {"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"}, - {"object_storage_cluster", "", "", "New setting"}, - {"object_storage_max_nodes", 0, 0, "New setting"}, - {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, - {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.6", { diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 5b52cdbb9920..0a118070c5e6 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -772,6 +772,9 @@ InterpreterInsertQuery::distributedWriteIntoReplicatedMergeTreeFromClusterStorag if (!src_storage_cluster) return {}; + if (src_storage_cluster->getOriginalClusterName().empty()) + return {}; + if (!isInsertSelectTrivialEnoughForDistributedExecution(query)) return {}; From f5215f29c4f75776edea827dc54c7b9e50464f71 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 20 Oct 2025 14:21:24 +0200 Subject: [PATCH 05/15] Fix typo --- src/Core/Settings.cpp | 4 ++-- src/Core/SettingsEnums.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 8ef7a64bad9e..4bcb3f704efd 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -1763,12 +1763,12 @@ Possible values: DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"( Changes the behaviour of object storage cluster function or table. -ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table. +ClickHouse applies this setting when the query contains the product of object storage cluster function or table, i.e. when the query for a object storage cluster function or table contains a non-GLOBAL subquery for the object storage cluster function or table. Restrictions: - Only applied for JOIN subqueries. -- Only if the FROM section uses a object storage cluster function ot table. +- Only if the FROM section uses a object storage cluster function or table. Possible values: diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index d4472e339edf..9be08214e1a7 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -163,7 +163,7 @@ enum class DistributedProductMode : uint8_t DECLARE_SETTING_ENUM(DistributedProductMode) -/// The setting for executing object storage cluster function ot table JOIN sections. +/// The setting for executing object storage cluster function or table JOIN sections. enum class ObjectStorageClusterJoinMode : uint8_t { LOCAL, /// Convert to local query From 023e38d6bb7af7cc7361fe01202acfbd4a9dd6ce Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 20 Oct 2025 18:45:53 +0200 Subject: [PATCH 06/15] Add tests with local table --- .../test_database_iceberg/configs/cluster.xml | 12 +++++ .../integration/test_database_iceberg/test.py | 47 +++++++++++++++++-- .../integration/test_storage_iceberg/test.py | 44 ++++++++++++++++- 3 files changed, 98 insertions(+), 5 deletions(-) create mode 100644 tests/integration/test_database_iceberg/configs/cluster.xml diff --git a/tests/integration/test_database_iceberg/configs/cluster.xml b/tests/integration/test_database_iceberg/configs/cluster.xml new file mode 100644 index 000000000000..b9638e40bc1e --- /dev/null +++ b/tests/integration/test_database_iceberg/configs/cluster.xml @@ -0,0 +1,12 @@ + + + + + + node1 + 9000 + + + + + \ No newline at end of file diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 81cc0d67ca68..b5004b395120 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -188,10 +188,11 @@ def started_cluster(): cluster = ClickHouseCluster(__file__) cluster.add_instance( "node1", - main_configs=["configs/backups.xml"], + main_configs=["configs/backups.xml", "configs/cluster.xml"], user_configs=[], stay_alive=True, with_iceberg_catalog=True, + with_zookeeper=True, ) logging.info("Starting cluster...") @@ -619,6 +620,7 @@ def test_cluster_joins(started_cluster): test_ref = f"test_join_tables_{uuid.uuid4()}" table_name = f"{test_ref}_table" table_name_2 = f"{test_ref}_table_2" + table_name_local = f"{test_ref}_table_local" root_namespace = f"{test_ref}_namespace" @@ -665,6 +667,9 @@ def test_cluster_joins(started_cluster): df = pa.Table.from_pylist(data) table2.append(df) + node.query(f"CREATE TABLE `{table_name_local}` (id Int64, second_name String) ENGINE = Memory()") + node.query(f"INSERT INTO `{table_name_local}` VALUES (1, 'Silver'), (2, 'Black')") + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) res = node.query( @@ -674,7 +679,9 @@ def test_cluster_joins(started_cluster): JOIN {CATALOG_NAME}.`{root_namespace}.{table_name_2}` AS t2 ON t1.tag=t2.id ORDER BY ALL - SETTINGS object_storage_cluster_join_mode='local' + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' """ ) @@ -689,7 +696,41 @@ def test_cluster_joins(started_cluster): FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}` ) ORDER BY ALL - SETTINGS object_storage_cluster_join_mode='local' + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\nJohn\n" + + res = node.query( + f""" + SELECT t1.name,t2.second_name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 + JOIN `{table_name_local}` AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\tBlack\nJohn\tSilver\n" + + res = node.query( + f""" + SELECT name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + WHERE tag in ( + SELECT id + FROM `{table_name_local}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' """ ) diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index d091512b6a40..a25e747c119e 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3500,6 +3500,7 @@ def test_cluster_joins(started_cluster, storage_type): spark = started_cluster.spark_session TABLE_NAME = "test_cluster_joins_" + storage_type + "_" + get_uuid_str() TABLE_NAME_2 = "test_cluster_joins_2_" + storage_type + "_" + get_uuid_str() + TABLE_NAME_LOCAL = "test_cluster_joins_local_" + storage_type + "_" + get_uuid_str() def execute_spark_query(query: str, table_name): return execute_spark_query_general( @@ -3556,6 +3557,9 @@ def execute_spark_query(query: str, table_name): storage_type, TABLE_NAME_2, started_cluster, table_function=True, run_on_cluster=True ) + instance.query(f"CREATE TABLE `{TABLE_NAME_LOCAL}` (id Int64, second_name String) ENGINE = Memory()") + instance.query(f"INSERT INTO `{TABLE_NAME_LOCAL}` VALUES (1, 'silver'), (2, 'black')") + res = instance.query( f""" SELECT t1.name,t2.second_name @@ -3563,7 +3567,9 @@ def execute_spark_query(query: str, table_name): JOIN {creation_expression_2} AS t2 ON t1.tag=t2.id ORDER BY ALL - SETTINGS object_storage_cluster_join_mode='local' + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' """ ) @@ -3578,7 +3584,41 @@ def execute_spark_query(query: str, table_name): FROM {creation_expression_2} ) ORDER BY ALL - SETTINGS object_storage_cluster_join_mode='local' + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\njohn\n" + + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + JOIN `{TABLE_NAME_LOCAL}` AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\tblack\njohn\tsilver\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag in ( + SELECT id + FROM `{TABLE_NAME_LOCAL}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' """ ) From bff8bbaf0dfe824cde521e1ed4bfea26f3dbf885 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 23 Oct 2025 13:50:50 +0200 Subject: [PATCH 07/15] ColumnConstable --- src/Columns/ColumnConstable.cpp | 227 ++++++++++++++++++ src/Columns/ColumnConstable.h | 398 ++++++++++++++++++++++++++++++++ src/Columns/IColumn.cpp | 2 + src/DataTypes/IDataType.cpp | 14 +- src/DataTypes/IDataType.h | 5 + 5 files changed, 645 insertions(+), 1 deletion(-) create mode 100644 src/Columns/ColumnConstable.cpp create mode 100644 src/Columns/ColumnConstable.h diff --git a/src/Columns/ColumnConstable.cpp b/src/Columns/ColumnConstable.cpp new file mode 100644 index 000000000000..6d9b78bb2881 --- /dev/null +++ b/src/Columns/ColumnConstable.cpp @@ -0,0 +1,227 @@ +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#if defined(MEMORY_SANITIZER) + #include +#endif + + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +extern const int NOT_IMPLEMENTED; +extern const int SIZES_OF_COLUMNS_DOESNT_MATCH; +} + +ColumnConstable::ColumnConstable(const ColumnPtr & data_, size_t s_) + : data(data_), s(s_) +{ + /// Squash Constable of Const or Constable. + while (true) + { + if (const ColumnConstable * const_data = typeid_cast(data.get())) + { + data = const_data->getDataColumnPtr(); + continue; + } + if (const ColumnConst * const_data = typeid_cast(data.get())) + { + data = const_data->getDataColumnPtr(); + continue; + } + break; + } + + is_const = (data->size() == 1); + + /// Check that the value is initialized. We do it earlier, before it will be used, to ease debugging. +#if defined(MEMORY_SANITIZER) + if (data->isFixedAndContiguous()) + { + StringRef value = data->getDataAt(0); + __msan_check_mem_is_initialized(value.data, value.size); + } +#endif +} + +void ColumnConstable::convertDataToFullColumn() +{ + if (!is_const) + return; + if (s != 1) + data = data->replicate(Offsets(1, s)); + is_const = false; +} + +ColumnPtr ColumnConstable::convertToFullColumn() const +{ + if (!is_const || s == 1) + return data; + return data->replicate(Offsets(1, s)); +} + +ColumnPtr ColumnConstable::removeLowCardinality() const +{ + return ColumnConstable::create(data->convertToFullColumnIfLowCardinality(), s); +} + +ColumnPtr ColumnConstable::filter(const Filter & filt, ssize_t result_size_hint) const +{ + if (!is_const) + return data->filter(filt, result_size_hint); + + if (s != filt.size()) + throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of filter ({}) doesn't match size of column ({})", + filt.size(), toString(s)); + + size_t new_size = countBytesInFilter(filt); + return cloneResized(new_size); +} + +void ColumnConstable::expand(const Filter & mask, bool inverted) +{ + if (mask.size() < s) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Mask size should be no less than data size."); + + size_t bytes_count = countBytesInFilter(mask); + if (inverted) + bytes_count = mask.size() - bytes_count; + + if (bytes_count < s) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Not enough bytes in mask"); + if (bytes_count > s) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Too many bytes in mask"); + + s = mask.size(); +} + + +ColumnPtr ColumnConstable::replicate(const Offsets & offsets) const +{ + if (!is_const) + return data->replicate(offsets); + + if (s != offsets.size()) + throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of offsets ({}) doesn't match size of column ({})", + offsets.size(), toString(s)); + + size_t replicated_size = 0 == s ? 0 : offsets.back(); + return ColumnConst::create(data, replicated_size); +} + +ColumnPtr ColumnConstable::permute(const Permutation & perm, size_t limit) const +{ + if (!is_const) + return data->permute(perm, limit); + + limit = getLimitForPermutation(size(), perm.size(), limit); + return ColumnConst::create(data, limit); +} + +ColumnPtr ColumnConstable::index(const IColumn & indexes, size_t limit) const +{ + if (!is_const) + return data->index(indexes, limit); + + if (limit == 0) + limit = indexes.size(); + + if (indexes.size() < limit) + throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of indexes ({}) is less than required ({})", + indexes.size(), toString(limit)); + + return ColumnConst::create(data, limit); +} + +MutableColumns ColumnConstable::scatter(ColumnIndex num_columns, const Selector & selector) const +{ + if (!is_const) + return data->scatter(num_columns, selector); + + if (s != selector.size()) + throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of selector ({}) doesn't match size of column ({})", + selector.size(), toString(s)); + + std::vector counts = countColumnsSizeInSelector(num_columns, selector); + + MutableColumns res(num_columns); + for (size_t i = 0; i < num_columns; ++i) + res[i] = cloneResized(counts[i]); + + return res; +} + +void ColumnConstable::gather(ColumnGathererStream &) +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot gather into constant column {}", getName()); +} + +void ColumnConstable::getPermutation(PermutationSortDirection /*direction*/, PermutationSortStability /*stability*/, + size_t /*limit*/, int /*nan_direction_hint*/, Permutation & res) const +{ + res.resize_exact(s); + iota(res.data(), s, IColumn::Permutation::value_type(0)); +} + +void ColumnConstable::updatePermutation(PermutationSortDirection /*direction*/, PermutationSortStability /*stability*/, + size_t, int, Permutation &, EqualRanges &) const +{ +} + +WeakHash32 ColumnConstable::getWeakHash32() const +{ + WeakHash32 element_hash = data->getWeakHash32(); + if (!is_const) + return element_hash; + return WeakHash32(s, element_hash.getData()[0]); +} + +void ColumnConstable::compareColumn( + const IColumn & rhs, + size_t rhs_row_num, + PaddedPODArray * row_indexes, + PaddedPODArray & compare_results, + int direction, + int nan_direction_hint) const +{ + if (!is_const) + return data->compareColumn(rhs, rhs_row_num, row_indexes, compare_results, direction, nan_direction_hint); + Int8 res = compareAt(1, 1, rhs, nan_direction_hint); + std::fill(compare_results.begin(), compare_results.end(), res); +} + +ColumnConstable::Ptr createColumnConstable(const ColumnPtr & column, Field value) +{ + auto data = column->cloneEmpty(); + data->insert(value); + return ColumnConstable::create(std::move(data), 1); +} + +ColumnConstable::Ptr createColumnConstable(const ColumnPtr & column, size_t const_value_index) +{ + auto data = column->cloneEmpty(); + data->insertFrom(*column, const_value_index); + return ColumnConstable::create(std::move(data), 1); +} + +ColumnConstable::Ptr createColumnConstableWithDefaultValue(const ColumnPtr & column) +{ + auto data = column->cloneEmpty(); + data->insertDefault(); + return ColumnConstable::create(std::move(data), 1); +} + + +} diff --git a/src/Columns/ColumnConstable.h b/src/Columns/ColumnConstable.h new file mode 100644 index 000000000000..506e9d6a0fb2 --- /dev/null +++ b/src/Columns/ColumnConstable.h @@ -0,0 +1,398 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +/** ColumnConstable contains another column with single element, + * but can convert it to full when different value inserted. + */ +class ColumnConstable final : public COWHelper, ColumnConstable> +{ +private: + friend class COWHelper, ColumnConstable>; + + WrappedPtr data; + size_t s; + bool is_const; + + ColumnConstable(const ColumnPtr & data, size_t s_); + ColumnConstable(const ColumnConstable & src) = default; + + void convertDataToFullColumn(); + +public: + bool isConst() const override { return false; } + + ColumnPtr convertToFullColumn() const; + + ColumnPtr convertToFullColumnIfConst() const override + { + return convertToFullColumn(); + } + + ColumnPtr removeLowCardinality() const; + + std::string getName() const override + { + return "Constable(" + data->getName() + ")"; + } + + const char * getFamilyName() const override + { + return "Constable"; + } + + TypeIndex getDataType() const override + { + return data->getDataType(); + } + + MutableColumnPtr cloneResized(size_t new_size) const override + { + if (!is_const) + return ColumnConstable::create(data->cloneResized(new_size), new_size); + return ColumnConstable::create(data, new_size); + } + + size_t size() const override + { + return s; + } + + Field operator[](size_t n) const override + { + return (*data)[is_const ? 0 : n]; + } + + void get(size_t n, Field & res) const override + { + data->get(is_const ? 0 : n, res); + } + + std::pair getValueNameAndType(size_t n) const override + { + return data->getValueNameAndType(is_const ? 0 : n); + } + + StringRef getDataAt(size_t n) const override + { + return data->getDataAt(is_const ? 0 : n); + } + + UInt64 get64(size_t n) const override + { + return data->get64(is_const ? 0 : n); + } + + UInt64 getUInt(size_t n) const override + { + return data->getUInt(is_const ? 0 : n); + } + + Int64 getInt(size_t n) const override + { + return data->getInt(is_const ? 0 : n); + } + + bool getBool(size_t n) const override + { + return data->getBool(is_const ? 0 : n); + } + + Float64 getFloat64(size_t n) const override + { + return data->getFloat64(is_const ? 0 : n); + } + + Float32 getFloat32(size_t n) const override + { + return data->getFloat32(is_const ? 0 : n); + } + + bool isDefaultAt(size_t n) const override + { + return data->isDefaultAt(is_const ? 0 : n); + } + + bool isNullAt(size_t n) const override + { + return data->isNullAt(is_const ? 0 : n); + } + +#if !defined(DEBUG_OR_SANITIZER_BUILD) + void insertRangeFrom(const IColumn & src, size_t start, size_t length) override +#else + void doInsertRangeFrom(const IColumn & src, size_t start, size_t length) override +#endif + { + if (length == 0) + return; + //if (s > 0 && (!is_const || !src.hasEqualValues() || src.getDataAt(0) != getDataAt(0))) + { + convertDataToFullColumn(); + auto src_full = src.convertToFullColumnIfConst(); +#if !defined(DEBUG_OR_SANITIZER_BUILD) + data->insertRangeFrom(*src_full, start, length); +#else + data->doInsertRangeFrom(*src_full, start, length); +#endif + } + s += length; + } + + void insert(const Field & field) override + { + convertDataToFullColumn(); + data->insert(field); + ++s; + } + + bool tryInsert(const Field & field) override + { + convertDataToFullColumn(); + if (!data->tryInsert(field)) + return false; + ++s; + return true; + } + + void insertData(const char * pos, size_t length) override + { + convertDataToFullColumn(); + data->insertData(pos, length); + ++s; + } + +#if !defined(DEBUG_OR_SANITIZER_BUILD) + void insertFrom(const IColumn & src, size_t position) override +#else + void doInsertFrom(const IColumn & src, size_t position) override +#endif + { + convertDataToFullColumn(); +#if !defined(DEBUG_OR_SANITIZER_BUILD) + data->insertFrom(src, position); +#else + data->doInsertFrom(src, position); +#endif + ++s; + } + +#if !defined(DEBUG_OR_SANITIZER_BUILD) + void insertManyFrom(const IColumn & src, size_t position, size_t length) override +#else + void doInsertManyFrom(const IColumn & src, size_t position, size_t length) override +#endif + { + convertDataToFullColumn(); +#if !defined(DEBUG_OR_SANITIZER_BUILD) + data->insertManyFrom(src, position, length); +#else + data->doInsertManyFrom(src, position, length); +#endif + s += length; + } + + void insertDefault() override + { + convertDataToFullColumn(); + data->insertDefault(); + ++s; + } + + void popBack(size_t n) override + { + if (!is_const) + data->popBack(n); + s -= n; + } + + StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override + { + return data->serializeValueIntoArena(is_const ? 0 : n, arena, begin); + } + + char * serializeValueIntoMemory(size_t n, char * memory) const override + { + return data->serializeValueIntoMemory(is_const ? 0 : n, memory); + } + + const char * deserializeAndInsertFromArena(const char * pos) override + { + const auto * res = data->deserializeAndInsertFromArena(pos); + if (is_const) + data->popBack(1); + ++s; + return res; + } + + const char * skipSerializedInArena(const char * pos) const override + { + return data->skipSerializedInArena(pos); + } + + void updateHashWithValue(size_t n, SipHash & hash) const override + { + data->updateHashWithValue(is_const ? 0 : n, hash); + } + + WeakHash32 getWeakHash32() const override; + + void updateHashFast(SipHash & hash) const override + { + data->updateHashFast(hash); + } + + ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override; + void expand(const Filter & mask, bool inverted) override; + + ColumnPtr replicate(const Offsets & offsets) const override; + ColumnPtr permute(const Permutation & perm, size_t limit) const override; + ColumnPtr index(const IColumn & indexes, size_t limit) const override; + void getPermutation(PermutationSortDirection direction, PermutationSortStability stability, + size_t limit, int nan_direction_hint, Permutation & res) const override; + void updatePermutation(PermutationSortDirection direction, PermutationSortStability stability, + size_t limit, int nan_direction_hint, Permutation & res, EqualRanges & equal_ranges) const override; + + size_t byteSize() const override + { + return data->byteSize() + sizeof(s) + sizeof(is_const); + } + + size_t byteSizeAt(size_t n) const override + { + return data->byteSizeAt(is_const ? 0 : n); + } + + size_t allocatedBytes() const override + { + return data->allocatedBytes() + sizeof(s) + sizeof(is_const); + } + +#if !defined(DEBUG_OR_SANITIZER_BUILD) + int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override +#else + int doCompareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override +#endif + { + if (!is_const) + return data->compareAt(n, m, rhs, nan_direction_hint); + return data->compareAt(0, 0, *assert_cast(rhs).data, nan_direction_hint); + } + + void compareColumn(const IColumn & rhs, size_t rhs_row_num, + PaddedPODArray * row_indexes, PaddedPODArray & compare_results, + int direction, int nan_direction_hint) const override; + + bool hasEqualValues() const override + { + if (is_const) + return true; + return data->hasEqualValues(); + } + + MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override; + + void gather(ColumnGathererStream &) override; + + void getExtremes(Field & min, Field & max) const override + { + data->getExtremes(min, max); + } + + void forEachSubcolumn(ColumnCallback callback) const override + { + callback(data); + } + + void forEachSubcolumnRecursively(RecursiveColumnCallback callback) const override + { + callback(*data); + data->forEachSubcolumnRecursively(callback); + } + + void forEachMutableSubcolumn(MutableColumnCallback callback) override + { + callback(data); + } + + void forEachMutableSubcolumnRecursively(RecursiveMutableColumnCallback callback) override + { + callback(*data); + data->forEachMutableSubcolumnRecursively(callback); + } + + bool structureEquals(const IColumn & rhs) const override + { + if (const auto * rhs_concrete = typeid_cast(&rhs)) + return data->structureEquals(*rhs_concrete->data); + return false; + } + + double getRatioOfDefaultRows(double sample_ratio) const override + { + if (!is_const) + return data->getRatioOfDefaultRows(sample_ratio); + return data->isDefaultAt(0) ? 1.0 : 0.0; + } + + UInt64 getNumberOfDefaultRows() const override + { + if (!is_const) + return data->getNumberOfDefaultRows(); + return data->isDefaultAt(0) ? s : 0; + } + + void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override + { + if (!is_const) + { + data->getIndicesOfNonDefaultRows(indices, from, limit); + return; + } + if (!data->isDefaultAt(0)) + { + size_t to = limit && from + limit < size() ? from + limit : size(); + indices.reserve_exact(indices.size() + to - from); + for (size_t i = from; i < to; ++i) + indices.push_back(i); + } + } + + bool isNullable() const override { return isColumnNullable(*data); } + bool onlyNull() const override + { + if (!is_const) + return data->onlyNull(); + return data->isNullAt(0); + } + + bool isNumeric() const override { return data->isNumeric(); } + bool isFixedAndContiguous() const override { return data->isFixedAndContiguous(); } + bool valuesHaveFixedSize() const override { return data->valuesHaveFixedSize(); } + size_t sizeOfValueIfFixed() const override { return data->sizeOfValueIfFixed(); } + std::string_view getRawData() const override { return data->getRawData(); } + + /// Not part of the common interface. + + const ColumnPtr & getDataColumnPtr() const { return data; } + + bool isCollationSupported() const override { return data->isCollationSupported(); } + + bool hasDynamicStructure() const override { return data->hasDynamicStructure(); } +}; + +ColumnConstable::Ptr createColumnConstable(const ColumnPtr & column, Field value); +ColumnConstable::Ptr createColumnConstable(const ColumnPtr & column, size_t const_value_index); +ColumnConstable::Ptr createColumnConstableWithDefaultValue(const ColumnPtr &column); + + +} diff --git a/src/Columns/IColumn.cpp b/src/Columns/IColumn.cpp index 5c926100e044..1277c37f4767 100644 --- a/src/Columns/IColumn.cpp +++ b/src/Columns/IColumn.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -827,6 +828,7 @@ template class IColumnHelper; template class IColumnHelper; template class IColumnHelper; template class IColumnHelper; +template class IColumnHelper; template class IColumnHelper; template class IColumnHelper; template class IColumnHelper; diff --git a/src/DataTypes/IDataType.cpp b/src/DataTypes/IDataType.cpp index 27031b2aaae1..ba0dd872d4f4 100644 --- a/src/DataTypes/IDataType.cpp +++ b/src/DataTypes/IDataType.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -91,12 +92,23 @@ ColumnPtr IDataType::createColumnConst(size_t size, const Field & field) const return ColumnConst::create(std::move(column), size); } - ColumnPtr IDataType::createColumnConstWithDefaultValue(size_t size) const { return createColumnConst(size, getDefault()); } +ColumnPtr IDataType::createColumnConstable(size_t size, const Field & field) const +{ + auto column = createColumn(); + column->insert(field); + return ColumnConstable::create(std::move(column), size); +} + +ColumnPtr IDataType::createColumnConstableWithDefaultValue(size_t size) const +{ + return createColumnConstable(size, getDefault()); +} + DataTypePtr IDataType::promoteNumericType() const { throw Exception(ErrorCodes::DATA_TYPE_CANNOT_BE_PROMOTED, "Data type {} can't be promoted.", getName()); diff --git a/src/DataTypes/IDataType.h b/src/DataTypes/IDataType.h index c9b0b6193024..c59c2b403959 100644 --- a/src/DataTypes/IDataType.h +++ b/src/DataTypes/IDataType.h @@ -176,6 +176,11 @@ class IDataType : private boost::noncopyable, public std::enable_shared_from_thi virtual ColumnPtr createColumnConst(size_t size, const Field & field) const; ColumnPtr createColumnConstWithDefaultValue(size_t size) const; + /** Create ColumnConst for corresponding type, with specified size and value. + */ + virtual ColumnPtr createColumnConstable(size_t size, const Field & field) const; + ColumnPtr createColumnConstableWithDefaultValue(size_t size) const; + /** Get default value of data type. * It is the "default" default, regardless the fact that a table could contain different user-specified default. */ From e143307fb47a8f83b2f8fc9fd3d40a1e107e3edc Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 24 Oct 2025 13:00:44 +0200 Subject: [PATCH 08/15] Fix build with sanitizer --- src/Columns/ColumnConstable.h | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/Columns/ColumnConstable.h b/src/Columns/ColumnConstable.h index 506e9d6a0fb2..8e6c7cbbbb91 100644 --- a/src/Columns/ColumnConstable.h +++ b/src/Columns/ColumnConstable.h @@ -139,11 +139,7 @@ class ColumnConstable final : public COWHelper, C { convertDataToFullColumn(); auto src_full = src.convertToFullColumnIfConst(); -#if !defined(DEBUG_OR_SANITIZER_BUILD) data->insertRangeFrom(*src_full, start, length); -#else - data->doInsertRangeFrom(*src_full, start, length); -#endif } s += length; } @@ -178,11 +174,7 @@ class ColumnConstable final : public COWHelper, C #endif { convertDataToFullColumn(); -#if !defined(DEBUG_OR_SANITIZER_BUILD) data->insertFrom(src, position); -#else - data->doInsertFrom(src, position); -#endif ++s; } @@ -193,11 +185,7 @@ class ColumnConstable final : public COWHelper, C #endif { convertDataToFullColumn(); -#if !defined(DEBUG_OR_SANITIZER_BUILD) data->insertManyFrom(src, position, length); -#else - data->doInsertManyFrom(src, position, length); -#endif s += length; } From 70805a84ea947c967d10a932465212b4d9b86e93 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 5 Nov 2025 15:51:07 +0100 Subject: [PATCH 09/15] Forgotten change --- src/Storages/ObjectStorage/StorageObjectStorageSource.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 77d2c25ab9a5..f1f84c75a8cd 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -363,7 +363,7 @@ Chunk StorageObjectStorageSource::generate() for (const auto & constant_column : reader.constant_columns_with_values) { chunk.addColumn(constant_column.first, - constant_column.second.name_and_type.type->createColumnConst( + constant_column.second.name_and_type.type->createColumnConstable( chunk.getNumRows(), constant_column.second.value)); } From 39882f53c4db00b12374f8cc0308508e390b4f1e Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 5 Nov 2025 18:16:18 +0100 Subject: [PATCH 10/15] Iceberg cross join --- src/Storages/IStorageCluster.cpp | 26 ++++++++++++++++++++------ src/Storages/IStorageCluster.h | 1 + 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 85ca80c5d85a..8c0e51be4e33 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -223,7 +223,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( { case ObjectStorageClusterJoinMode::LOCAL: { - if (has_join || has_local_columns_in_where) + if (has_join || has_cross_join || has_local_columns_in_where) { auto modified_query_tree = query_tree->clone(); @@ -237,7 +237,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( auto & query_node = modified_query_tree->as(); - if (has_join) + if (has_join || has_cross_join) { if (table_function_searcher.getType().value() == QueryTreeNodeType::TABLE_FUNCTION) { @@ -246,11 +246,20 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( auto & table_function_ast = table_function->as(); query_tree_distributed->setAlias(table_function_ast.alias); } - else + else if (has_join) { auto join_node = query_node.getJoinTree(); query_tree_distributed = join_node->as()->getLeftTableExpression()->clone(); } + else + { + SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context); + join_searcher.visit(modified_query_tree); + auto cross_join_node = join_searcher.getNode(); + if (!cross_join_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find CROSS JOIN node"); + query_tree_distributed = cross_join_node->as()->getTableExpressions()[0]->clone(); + } } // Find add used columns from table function to make proper projection list @@ -528,10 +537,15 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( throw Exception(ErrorCodes::NOT_IMPLEMENTED, "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); - SearcherVisitor join_searcher({QueryTreeNodeType::JOIN}, context); + SearcherVisitor join_searcher({QueryTreeNodeType::JOIN, QueryTreeNodeType::CROSS_JOIN}, context); join_searcher.visit(query_info.query_tree); if (join_searcher.getNode()) - has_join = true; + { + if (join_searcher.getType() == QueryTreeNodeType::JOIN) + has_join = true; + else + has_cross_join = true; + } SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); table_function_searcher.visit(query_info.query_tree); @@ -554,7 +568,7 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( has_local_columns_in_where = true; } - if (has_join || has_local_columns_in_where) + if (has_join || has_cross_join || has_local_columns_in_where) return QueryProcessingStage::Enum::FetchColumns; } diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 362f938d120b..f560a91da36f 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -104,6 +104,7 @@ class IStorageCluster : public IStorage String cluster_name; mutable bool has_join = false; + mutable bool has_cross_join = false; mutable bool has_local_columns_in_where = false; }; From 9ce6b71feb1454c88161e21b87f1803c99074842 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 11 Nov 2025 14:19:01 +0100 Subject: [PATCH 11/15] Fix info about joins --- src/Storages/IStorageCluster.cpp | 83 ++++++++++++++++++-------------- src/Storages/IStorageCluster.h | 11 +++-- 2 files changed, 55 insertions(+), 39 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 8c0e51be4e33..9643bf87992c 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -223,7 +223,9 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( { case ObjectStorageClusterJoinMode::LOCAL: { - if (has_join || has_cross_join || has_local_columns_in_where) + auto info = getQueryTreeInfo(query_tree, context); + + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) { auto modified_query_tree = query_tree->clone(); @@ -237,7 +239,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( auto & query_node = modified_query_tree->as(); - if (has_join || has_cross_join) + if (info.has_join || info.has_cross_join) { if (table_function_searcher.getType().value() == QueryTreeNodeType::TABLE_FUNCTION) { @@ -246,7 +248,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( auto & table_function_ast = table_function->as(); query_tree_distributed->setAlias(table_function_ast.alias); } - else if (has_join) + else if (info.has_join) { auto join_node = query_node.getJoinTree(); query_tree_distributed = join_node->as()->getLeftTableExpression()->clone(); @@ -275,7 +277,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( column_nodes_to_select->getNodes().emplace_back(std::make_shared(column, table_function_node)); query_node.getProjectionNode() = column_nodes_to_select; - if (has_local_columns_in_where) + if (info.has_local_columns_in_where) { if (query_node.getPrewhere()) removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getPrewhere(), table_function_node, context); @@ -526,6 +528,45 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const pipeline.init(std::move(pipe)); } +IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePtr query_tree, ContextPtr context) +{ + QueryTreeInfo info; + + SearcherVisitor join_searcher({QueryTreeNodeType::JOIN, QueryTreeNodeType::CROSS_JOIN}, context); + join_searcher.visit(query_tree); + + if (join_searcher.getNode()) + { + if (join_searcher.getType() == QueryTreeNodeType::JOIN) + info.has_join = true; + else + info.has_cross_join = true; + } + + SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + table_function_searcher.visit(query_tree); + auto table_function_node = table_function_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node"); + + auto & query_node = query_tree->as(); + if (query_node.hasWhere() || query_node.hasPrewhere()) + { + CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); + if (query_node.hasPrewhere()) + collector_where.visit(query_node.getPrewhere()); + if (query_node.hasWhere()) + collector_where.visit(query_node.getWhere()); + + // SELECT x FROM datalake.table WHERE x IN local.table + // Need to modify 'WHERE' on remote node if it contains columns from other sources + if (!collector_where.getColumns().empty()) + info.has_local_columns_in_where = true; + } + + return info; +} + QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const { @@ -537,38 +578,8 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( throw Exception(ErrorCodes::NOT_IMPLEMENTED, "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); - SearcherVisitor join_searcher({QueryTreeNodeType::JOIN, QueryTreeNodeType::CROSS_JOIN}, context); - join_searcher.visit(query_info.query_tree); - if (join_searcher.getNode()) - { - if (join_searcher.getType() == QueryTreeNodeType::JOIN) - has_join = true; - else - has_cross_join = true; - } - - SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); - table_function_searcher.visit(query_info.query_tree); - auto table_function_node = table_function_searcher.getNode(); - if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node"); - - auto & query_node = query_info.query_tree->as(); - if (query_node.hasWhere() || query_node.hasPrewhere()) - { - CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); - if (query_node.hasPrewhere()) - collector_where.visit(query_node.getPrewhere()); - if (query_node.hasWhere()) - collector_where.visit(query_node.getWhere()); - - // SELECT x FROM datalake.table WHERE x IN local.table - // Need to modify 'WHERE' on remote node if it contains columns from other sources - if (!collector_where.getColumns().empty()) - has_local_columns_in_where = true; - } - - if (has_join || has_cross_join || has_local_columns_in_where) + auto info = getQueryTreeInfo(query_info.query_tree, context); + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) return QueryProcessingStage::Enum::FetchColumns; } diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index f560a91da36f..59d67ce9cddf 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -103,9 +103,14 @@ class IStorageCluster : public IStorage LoggerPtr log; String cluster_name; - mutable bool has_join = false; - mutable bool has_cross_join = false; - mutable bool has_local_columns_in_where = false; + struct QueryTreeInfo + { + bool has_join = false; + bool has_cross_join = false; + bool has_local_columns_in_where = false; + }; + + static QueryTreeInfo getQueryTreeInfo(QueryTreeNodePtr query_tree, ContextPtr context); }; From 239cf4cd59a1aff7f7f196874310abbb719c2500 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 11 Nov 2025 22:03:52 +0100 Subject: [PATCH 12/15] Turn off contable column --- src/Storages/ObjectStorage/StorageObjectStorageSource.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index f1f84c75a8cd..a07866e68cc6 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -363,8 +363,8 @@ Chunk StorageObjectStorageSource::generate() for (const auto & constant_column : reader.constant_columns_with_values) { chunk.addColumn(constant_column.first, - constant_column.second.name_and_type.type->createColumnConstable( - chunk.getNumRows(), constant_column.second.value)); + constant_column.second.name_and_type.type->createColumnConst( + chunk.getNumRows(), constant_column.second.value)->convertToFullColumnIfConst()); } #if USE_PARQUET && USE_AWS_S3 From 154e5fe6967ab4cf42d9ebbf116e6a8db8318046 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 13 Nov 2025 11:46:15 +0100 Subject: [PATCH 13/15] Revert "ColumnConstable" This reverts commit bff8bbaf0dfe824cde521e1ed4bfea26f3dbf885. --- src/Columns/ColumnConstable.cpp | 227 ------------------- src/Columns/ColumnConstable.h | 386 -------------------------------- src/Columns/IColumn.cpp | 2 - src/DataTypes/IDataType.cpp | 14 +- src/DataTypes/IDataType.h | 5 - 5 files changed, 1 insertion(+), 633 deletions(-) delete mode 100644 src/Columns/ColumnConstable.cpp delete mode 100644 src/Columns/ColumnConstable.h diff --git a/src/Columns/ColumnConstable.cpp b/src/Columns/ColumnConstable.cpp deleted file mode 100644 index 6d9b78bb2881..000000000000 --- a/src/Columns/ColumnConstable.cpp +++ /dev/null @@ -1,227 +0,0 @@ -#include - -#include -#include -#include -#include -#include -#include -#include - -#include - -#if defined(MEMORY_SANITIZER) - #include -#endif - - -namespace DB -{ - -namespace ErrorCodes -{ -extern const int LOGICAL_ERROR; -extern const int NOT_IMPLEMENTED; -extern const int SIZES_OF_COLUMNS_DOESNT_MATCH; -} - -ColumnConstable::ColumnConstable(const ColumnPtr & data_, size_t s_) - : data(data_), s(s_) -{ - /// Squash Constable of Const or Constable. - while (true) - { - if (const ColumnConstable * const_data = typeid_cast(data.get())) - { - data = const_data->getDataColumnPtr(); - continue; - } - if (const ColumnConst * const_data = typeid_cast(data.get())) - { - data = const_data->getDataColumnPtr(); - continue; - } - break; - } - - is_const = (data->size() == 1); - - /// Check that the value is initialized. We do it earlier, before it will be used, to ease debugging. -#if defined(MEMORY_SANITIZER) - if (data->isFixedAndContiguous()) - { - StringRef value = data->getDataAt(0); - __msan_check_mem_is_initialized(value.data, value.size); - } -#endif -} - -void ColumnConstable::convertDataToFullColumn() -{ - if (!is_const) - return; - if (s != 1) - data = data->replicate(Offsets(1, s)); - is_const = false; -} - -ColumnPtr ColumnConstable::convertToFullColumn() const -{ - if (!is_const || s == 1) - return data; - return data->replicate(Offsets(1, s)); -} - -ColumnPtr ColumnConstable::removeLowCardinality() const -{ - return ColumnConstable::create(data->convertToFullColumnIfLowCardinality(), s); -} - -ColumnPtr ColumnConstable::filter(const Filter & filt, ssize_t result_size_hint) const -{ - if (!is_const) - return data->filter(filt, result_size_hint); - - if (s != filt.size()) - throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of filter ({}) doesn't match size of column ({})", - filt.size(), toString(s)); - - size_t new_size = countBytesInFilter(filt); - return cloneResized(new_size); -} - -void ColumnConstable::expand(const Filter & mask, bool inverted) -{ - if (mask.size() < s) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Mask size should be no less than data size."); - - size_t bytes_count = countBytesInFilter(mask); - if (inverted) - bytes_count = mask.size() - bytes_count; - - if (bytes_count < s) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Not enough bytes in mask"); - if (bytes_count > s) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Too many bytes in mask"); - - s = mask.size(); -} - - -ColumnPtr ColumnConstable::replicate(const Offsets & offsets) const -{ - if (!is_const) - return data->replicate(offsets); - - if (s != offsets.size()) - throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of offsets ({}) doesn't match size of column ({})", - offsets.size(), toString(s)); - - size_t replicated_size = 0 == s ? 0 : offsets.back(); - return ColumnConst::create(data, replicated_size); -} - -ColumnPtr ColumnConstable::permute(const Permutation & perm, size_t limit) const -{ - if (!is_const) - return data->permute(perm, limit); - - limit = getLimitForPermutation(size(), perm.size(), limit); - return ColumnConst::create(data, limit); -} - -ColumnPtr ColumnConstable::index(const IColumn & indexes, size_t limit) const -{ - if (!is_const) - return data->index(indexes, limit); - - if (limit == 0) - limit = indexes.size(); - - if (indexes.size() < limit) - throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of indexes ({}) is less than required ({})", - indexes.size(), toString(limit)); - - return ColumnConst::create(data, limit); -} - -MutableColumns ColumnConstable::scatter(ColumnIndex num_columns, const Selector & selector) const -{ - if (!is_const) - return data->scatter(num_columns, selector); - - if (s != selector.size()) - throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Size of selector ({}) doesn't match size of column ({})", - selector.size(), toString(s)); - - std::vector counts = countColumnsSizeInSelector(num_columns, selector); - - MutableColumns res(num_columns); - for (size_t i = 0; i < num_columns; ++i) - res[i] = cloneResized(counts[i]); - - return res; -} - -void ColumnConstable::gather(ColumnGathererStream &) -{ - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot gather into constant column {}", getName()); -} - -void ColumnConstable::getPermutation(PermutationSortDirection /*direction*/, PermutationSortStability /*stability*/, - size_t /*limit*/, int /*nan_direction_hint*/, Permutation & res) const -{ - res.resize_exact(s); - iota(res.data(), s, IColumn::Permutation::value_type(0)); -} - -void ColumnConstable::updatePermutation(PermutationSortDirection /*direction*/, PermutationSortStability /*stability*/, - size_t, int, Permutation &, EqualRanges &) const -{ -} - -WeakHash32 ColumnConstable::getWeakHash32() const -{ - WeakHash32 element_hash = data->getWeakHash32(); - if (!is_const) - return element_hash; - return WeakHash32(s, element_hash.getData()[0]); -} - -void ColumnConstable::compareColumn( - const IColumn & rhs, - size_t rhs_row_num, - PaddedPODArray * row_indexes, - PaddedPODArray & compare_results, - int direction, - int nan_direction_hint) const -{ - if (!is_const) - return data->compareColumn(rhs, rhs_row_num, row_indexes, compare_results, direction, nan_direction_hint); - Int8 res = compareAt(1, 1, rhs, nan_direction_hint); - std::fill(compare_results.begin(), compare_results.end(), res); -} - -ColumnConstable::Ptr createColumnConstable(const ColumnPtr & column, Field value) -{ - auto data = column->cloneEmpty(); - data->insert(value); - return ColumnConstable::create(std::move(data), 1); -} - -ColumnConstable::Ptr createColumnConstable(const ColumnPtr & column, size_t const_value_index) -{ - auto data = column->cloneEmpty(); - data->insertFrom(*column, const_value_index); - return ColumnConstable::create(std::move(data), 1); -} - -ColumnConstable::Ptr createColumnConstableWithDefaultValue(const ColumnPtr & column) -{ - auto data = column->cloneEmpty(); - data->insertDefault(); - return ColumnConstable::create(std::move(data), 1); -} - - -} diff --git a/src/Columns/ColumnConstable.h b/src/Columns/ColumnConstable.h deleted file mode 100644 index 8e6c7cbbbb91..000000000000 --- a/src/Columns/ColumnConstable.h +++ /dev/null @@ -1,386 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include - -#include - -namespace DB -{ - -/** ColumnConstable contains another column with single element, - * but can convert it to full when different value inserted. - */ -class ColumnConstable final : public COWHelper, ColumnConstable> -{ -private: - friend class COWHelper, ColumnConstable>; - - WrappedPtr data; - size_t s; - bool is_const; - - ColumnConstable(const ColumnPtr & data, size_t s_); - ColumnConstable(const ColumnConstable & src) = default; - - void convertDataToFullColumn(); - -public: - bool isConst() const override { return false; } - - ColumnPtr convertToFullColumn() const; - - ColumnPtr convertToFullColumnIfConst() const override - { - return convertToFullColumn(); - } - - ColumnPtr removeLowCardinality() const; - - std::string getName() const override - { - return "Constable(" + data->getName() + ")"; - } - - const char * getFamilyName() const override - { - return "Constable"; - } - - TypeIndex getDataType() const override - { - return data->getDataType(); - } - - MutableColumnPtr cloneResized(size_t new_size) const override - { - if (!is_const) - return ColumnConstable::create(data->cloneResized(new_size), new_size); - return ColumnConstable::create(data, new_size); - } - - size_t size() const override - { - return s; - } - - Field operator[](size_t n) const override - { - return (*data)[is_const ? 0 : n]; - } - - void get(size_t n, Field & res) const override - { - data->get(is_const ? 0 : n, res); - } - - std::pair getValueNameAndType(size_t n) const override - { - return data->getValueNameAndType(is_const ? 0 : n); - } - - StringRef getDataAt(size_t n) const override - { - return data->getDataAt(is_const ? 0 : n); - } - - UInt64 get64(size_t n) const override - { - return data->get64(is_const ? 0 : n); - } - - UInt64 getUInt(size_t n) const override - { - return data->getUInt(is_const ? 0 : n); - } - - Int64 getInt(size_t n) const override - { - return data->getInt(is_const ? 0 : n); - } - - bool getBool(size_t n) const override - { - return data->getBool(is_const ? 0 : n); - } - - Float64 getFloat64(size_t n) const override - { - return data->getFloat64(is_const ? 0 : n); - } - - Float32 getFloat32(size_t n) const override - { - return data->getFloat32(is_const ? 0 : n); - } - - bool isDefaultAt(size_t n) const override - { - return data->isDefaultAt(is_const ? 0 : n); - } - - bool isNullAt(size_t n) const override - { - return data->isNullAt(is_const ? 0 : n); - } - -#if !defined(DEBUG_OR_SANITIZER_BUILD) - void insertRangeFrom(const IColumn & src, size_t start, size_t length) override -#else - void doInsertRangeFrom(const IColumn & src, size_t start, size_t length) override -#endif - { - if (length == 0) - return; - //if (s > 0 && (!is_const || !src.hasEqualValues() || src.getDataAt(0) != getDataAt(0))) - { - convertDataToFullColumn(); - auto src_full = src.convertToFullColumnIfConst(); - data->insertRangeFrom(*src_full, start, length); - } - s += length; - } - - void insert(const Field & field) override - { - convertDataToFullColumn(); - data->insert(field); - ++s; - } - - bool tryInsert(const Field & field) override - { - convertDataToFullColumn(); - if (!data->tryInsert(field)) - return false; - ++s; - return true; - } - - void insertData(const char * pos, size_t length) override - { - convertDataToFullColumn(); - data->insertData(pos, length); - ++s; - } - -#if !defined(DEBUG_OR_SANITIZER_BUILD) - void insertFrom(const IColumn & src, size_t position) override -#else - void doInsertFrom(const IColumn & src, size_t position) override -#endif - { - convertDataToFullColumn(); - data->insertFrom(src, position); - ++s; - } - -#if !defined(DEBUG_OR_SANITIZER_BUILD) - void insertManyFrom(const IColumn & src, size_t position, size_t length) override -#else - void doInsertManyFrom(const IColumn & src, size_t position, size_t length) override -#endif - { - convertDataToFullColumn(); - data->insertManyFrom(src, position, length); - s += length; - } - - void insertDefault() override - { - convertDataToFullColumn(); - data->insertDefault(); - ++s; - } - - void popBack(size_t n) override - { - if (!is_const) - data->popBack(n); - s -= n; - } - - StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override - { - return data->serializeValueIntoArena(is_const ? 0 : n, arena, begin); - } - - char * serializeValueIntoMemory(size_t n, char * memory) const override - { - return data->serializeValueIntoMemory(is_const ? 0 : n, memory); - } - - const char * deserializeAndInsertFromArena(const char * pos) override - { - const auto * res = data->deserializeAndInsertFromArena(pos); - if (is_const) - data->popBack(1); - ++s; - return res; - } - - const char * skipSerializedInArena(const char * pos) const override - { - return data->skipSerializedInArena(pos); - } - - void updateHashWithValue(size_t n, SipHash & hash) const override - { - data->updateHashWithValue(is_const ? 0 : n, hash); - } - - WeakHash32 getWeakHash32() const override; - - void updateHashFast(SipHash & hash) const override - { - data->updateHashFast(hash); - } - - ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override; - void expand(const Filter & mask, bool inverted) override; - - ColumnPtr replicate(const Offsets & offsets) const override; - ColumnPtr permute(const Permutation & perm, size_t limit) const override; - ColumnPtr index(const IColumn & indexes, size_t limit) const override; - void getPermutation(PermutationSortDirection direction, PermutationSortStability stability, - size_t limit, int nan_direction_hint, Permutation & res) const override; - void updatePermutation(PermutationSortDirection direction, PermutationSortStability stability, - size_t limit, int nan_direction_hint, Permutation & res, EqualRanges & equal_ranges) const override; - - size_t byteSize() const override - { - return data->byteSize() + sizeof(s) + sizeof(is_const); - } - - size_t byteSizeAt(size_t n) const override - { - return data->byteSizeAt(is_const ? 0 : n); - } - - size_t allocatedBytes() const override - { - return data->allocatedBytes() + sizeof(s) + sizeof(is_const); - } - -#if !defined(DEBUG_OR_SANITIZER_BUILD) - int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override -#else - int doCompareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override -#endif - { - if (!is_const) - return data->compareAt(n, m, rhs, nan_direction_hint); - return data->compareAt(0, 0, *assert_cast(rhs).data, nan_direction_hint); - } - - void compareColumn(const IColumn & rhs, size_t rhs_row_num, - PaddedPODArray * row_indexes, PaddedPODArray & compare_results, - int direction, int nan_direction_hint) const override; - - bool hasEqualValues() const override - { - if (is_const) - return true; - return data->hasEqualValues(); - } - - MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override; - - void gather(ColumnGathererStream &) override; - - void getExtremes(Field & min, Field & max) const override - { - data->getExtremes(min, max); - } - - void forEachSubcolumn(ColumnCallback callback) const override - { - callback(data); - } - - void forEachSubcolumnRecursively(RecursiveColumnCallback callback) const override - { - callback(*data); - data->forEachSubcolumnRecursively(callback); - } - - void forEachMutableSubcolumn(MutableColumnCallback callback) override - { - callback(data); - } - - void forEachMutableSubcolumnRecursively(RecursiveMutableColumnCallback callback) override - { - callback(*data); - data->forEachMutableSubcolumnRecursively(callback); - } - - bool structureEquals(const IColumn & rhs) const override - { - if (const auto * rhs_concrete = typeid_cast(&rhs)) - return data->structureEquals(*rhs_concrete->data); - return false; - } - - double getRatioOfDefaultRows(double sample_ratio) const override - { - if (!is_const) - return data->getRatioOfDefaultRows(sample_ratio); - return data->isDefaultAt(0) ? 1.0 : 0.0; - } - - UInt64 getNumberOfDefaultRows() const override - { - if (!is_const) - return data->getNumberOfDefaultRows(); - return data->isDefaultAt(0) ? s : 0; - } - - void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override - { - if (!is_const) - { - data->getIndicesOfNonDefaultRows(indices, from, limit); - return; - } - if (!data->isDefaultAt(0)) - { - size_t to = limit && from + limit < size() ? from + limit : size(); - indices.reserve_exact(indices.size() + to - from); - for (size_t i = from; i < to; ++i) - indices.push_back(i); - } - } - - bool isNullable() const override { return isColumnNullable(*data); } - bool onlyNull() const override - { - if (!is_const) - return data->onlyNull(); - return data->isNullAt(0); - } - - bool isNumeric() const override { return data->isNumeric(); } - bool isFixedAndContiguous() const override { return data->isFixedAndContiguous(); } - bool valuesHaveFixedSize() const override { return data->valuesHaveFixedSize(); } - size_t sizeOfValueIfFixed() const override { return data->sizeOfValueIfFixed(); } - std::string_view getRawData() const override { return data->getRawData(); } - - /// Not part of the common interface. - - const ColumnPtr & getDataColumnPtr() const { return data; } - - bool isCollationSupported() const override { return data->isCollationSupported(); } - - bool hasDynamicStructure() const override { return data->hasDynamicStructure(); } -}; - -ColumnConstable::Ptr createColumnConstable(const ColumnPtr & column, Field value); -ColumnConstable::Ptr createColumnConstable(const ColumnPtr & column, size_t const_value_index); -ColumnConstable::Ptr createColumnConstableWithDefaultValue(const ColumnPtr &column); - - -} diff --git a/src/Columns/IColumn.cpp b/src/Columns/IColumn.cpp index 1277c37f4767..5c926100e044 100644 --- a/src/Columns/IColumn.cpp +++ b/src/Columns/IColumn.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -828,7 +827,6 @@ template class IColumnHelper; template class IColumnHelper; template class IColumnHelper; template class IColumnHelper; -template class IColumnHelper; template class IColumnHelper; template class IColumnHelper; template class IColumnHelper; diff --git a/src/DataTypes/IDataType.cpp b/src/DataTypes/IDataType.cpp index ba0dd872d4f4..27031b2aaae1 100644 --- a/src/DataTypes/IDataType.cpp +++ b/src/DataTypes/IDataType.cpp @@ -1,7 +1,6 @@ #include #include #include -#include #include #include @@ -92,23 +91,12 @@ ColumnPtr IDataType::createColumnConst(size_t size, const Field & field) const return ColumnConst::create(std::move(column), size); } + ColumnPtr IDataType::createColumnConstWithDefaultValue(size_t size) const { return createColumnConst(size, getDefault()); } -ColumnPtr IDataType::createColumnConstable(size_t size, const Field & field) const -{ - auto column = createColumn(); - column->insert(field); - return ColumnConstable::create(std::move(column), size); -} - -ColumnPtr IDataType::createColumnConstableWithDefaultValue(size_t size) const -{ - return createColumnConstable(size, getDefault()); -} - DataTypePtr IDataType::promoteNumericType() const { throw Exception(ErrorCodes::DATA_TYPE_CANNOT_BE_PROMOTED, "Data type {} can't be promoted.", getName()); diff --git a/src/DataTypes/IDataType.h b/src/DataTypes/IDataType.h index c59c2b403959..c9b0b6193024 100644 --- a/src/DataTypes/IDataType.h +++ b/src/DataTypes/IDataType.h @@ -176,11 +176,6 @@ class IDataType : private boost::noncopyable, public std::enable_shared_from_thi virtual ColumnPtr createColumnConst(size_t size, const Field & field) const; ColumnPtr createColumnConstWithDefaultValue(size_t size) const; - /** Create ColumnConst for corresponding type, with specified size and value. - */ - virtual ColumnPtr createColumnConstable(size_t size, const Field & field) const; - ColumnPtr createColumnConstableWithDefaultValue(size_t size) const; - /** Get default value of data type. * It is the "default" default, regardless the fact that a table could contain different user-specified default. */ From 06e48e431c47784d1f32a763234d1d95267e2df3 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 14 Nov 2025 18:39:23 +0100 Subject: [PATCH 14/15] Fixes after review --- src/Storages/IStorageCluster.cpp | 49 +++++++++++++------------------- 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index d91a9221f005..20da6120a25e 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -229,9 +229,9 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( { auto modified_query_tree = query_tree->clone(); - SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); - table_function_searcher.visit(modified_query_tree); - auto table_function_node = table_function_searcher.getNode(); + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + left_table_expression_searcher.visit(modified_query_tree); + auto table_function_node = left_table_expression_searcher.getNode(); if (!table_function_node) throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); @@ -239,29 +239,19 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( auto & query_node = modified_query_tree->as(); - if (info.has_join || info.has_cross_join) + if (info.has_join) { - if (table_function_searcher.getType().value() == QueryTreeNodeType::TABLE_FUNCTION) - { - auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); - query_tree_distributed = buildTableFunctionQueryTree(table_function, context); - auto & table_function_ast = table_function->as(); - query_tree_distributed->setAlias(table_function_ast.alias); - } - else if (info.has_join) - { - auto join_node = query_node.getJoinTree(); - query_tree_distributed = join_node->as()->getLeftTableExpression()->clone(); - } - else - { - SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context); - join_searcher.visit(modified_query_tree); - auto cross_join_node = join_searcher.getNode(); - if (!cross_join_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find CROSS JOIN node"); - query_tree_distributed = cross_join_node->as()->getTableExpressions()[0]->clone(); - } + auto join_node = query_node.getJoinTree(); + query_tree_distributed = join_node->as()->getLeftTableExpression()->clone(); + } + else if (info.has_cross_join) + { + SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context); + join_searcher.visit(modified_query_tree); + auto cross_join_node = join_searcher.getNode(); + if (!cross_join_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find CROSS JOIN node"); + query_tree_distributed = cross_join_node->as()->getTableExpressions()[0]->clone(); } // Find add used columns from table function to make proper projection list @@ -545,9 +535,9 @@ IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePt info.has_cross_join = true; } - SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); - table_function_searcher.visit(query_tree); - auto table_function_node = table_function_searcher.getNode(); + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + left_table_expression_searcher.visit(query_tree); + auto table_function_node = left_table_expression_searcher.getNode(); if (!table_function_node) throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node"); @@ -560,8 +550,9 @@ IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePt if (query_node.hasWhere()) collector_where.visit(query_node.getWhere()); - // SELECT x FROM datalake.table WHERE x IN local.table + // SELECT x FROM datalake.table WHERE x IN local.table. // Need to modify 'WHERE' on remote node if it contains columns from other sources + // because remote node might not have those sources. if (!collector_where.getColumns().empty()) info.has_local_columns_in_where = true; } From 6ceecc4b9e9a6988f99ae16d380f022918aa97eb Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 17 Nov 2025 12:41:52 +0100 Subject: [PATCH 15/15] Add tests --- src/Storages/IStorageCluster.cpp | 1 + tests/integration/test_database_iceberg/test.py | 14 ++++++++++++++ tests/integration/test_storage_iceberg/test.py | 14 ++++++++++++++ 3 files changed, 29 insertions(+) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 20da6120a25e..7d0c6fb97177 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -251,6 +251,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( auto cross_join_node = join_searcher.getNode(); if (!cross_join_node) throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find CROSS JOIN node"); + // CrossJoinNode contains vector of nodes. 0 is left expression, always exists. query_tree_distributed = cross_join_node->as()->getTableExpressions()[0]->clone(); } diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index f717f73f12ac..74279463220b 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -781,3 +781,17 @@ def test_cluster_joins(started_cluster): ) assert res == "Jack\nJohn\n" + + res = node.query( + f""" + SELECT t1.name,t2.second_name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 + CROSS JOIN `{table_name_local}` AS t2 + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\tBlack\nJack\tSilver\nJohn\tBlack\nJohn\tSilver\n" diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 38357f433387..c9c8ad58eff1 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3638,6 +3638,20 @@ def execute_spark_query(query: str, table_name): assert res == "jack\njohn\n" + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + CROSS JOIN `{TABLE_NAME_LOCAL}` AS t2 + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\tblack\njack\tsilver\njohn\tblack\njohn\tsilver\n" + @pytest.mark.parametrize("storage_type", ["s3"]) def test_system_tables_partition_sorting_keys(started_cluster, storage_type):