diff --git a/src/Interpreters/PreparedSets.cpp b/src/Interpreters/PreparedSets.cpp index 3eedfb308820..1f2680eee375 100644 --- a/src/Interpreters/PreparedSets.cpp +++ b/src/Interpreters/PreparedSets.cpp @@ -172,6 +172,12 @@ FutureSetFromSubquery::FutureSetFromSubquery( FutureSetFromSubquery::~FutureSetFromSubquery() = default; SetPtr FutureSetFromSubquery::get() const +{ + std::lock_guard lock(mutex); + return get_unsafe(); +} + +SetPtr FutureSetFromSubquery::get_unsafe() const { if (set_and_key->set != nullptr && set_and_key->set->isCreated()) return set_and_key->set; @@ -181,20 +187,32 @@ SetPtr FutureSetFromSubquery::get() const void FutureSetFromSubquery::setQueryPlan(std::unique_ptr source_) { + std::lock_guard lock(mutex); source = std::move(source_); set_and_key->set->setHeader(source->getCurrentHeader()->getColumnsWithTypeAndName()); } -void FutureSetFromSubquery::setExternalTable(StoragePtr external_table_) { external_table = std::move(external_table_); } +void FutureSetFromSubquery::setExternalTable(StoragePtr external_table_) +{ + std::lock_guard lock(mutex); + external_table = std::move(external_table_); +} DataTypes FutureSetFromSubquery::getTypes() const { + std::lock_guard lock(mutex); return set_and_key->set->getElementsTypes(); } FutureSet::Hash FutureSetFromSubquery::getHash() const { return hash; } std::unique_ptr FutureSetFromSubquery::build(const SizeLimits & network_transfer_limits, const PreparedSetsCachePtr & prepared_sets_cache) +{ + std::lock_guard lock(mutex); + return build_unsafe(network_transfer_limits, prepared_sets_cache); +} + +std::unique_ptr FutureSetFromSubquery::build_unsafe(const SizeLimits & network_transfer_limits, const PreparedSetsCachePtr & prepared_sets_cache) { if (set_and_key->set->isCreated()) return nullptr; @@ -217,6 +235,8 @@ std::unique_ptr FutureSetFromSubquery::build(const SizeLimits & netwo void FutureSetFromSubquery::buildSetInplace(const ContextPtr & context) { + std::lock_guard lock(mutex); + if (external_table_set) external_table_set->buildSetInplace(context); @@ -224,7 +244,7 @@ void FutureSetFromSubquery::buildSetInplace(const ContextPtr & context) SizeLimits network_transfer_limits(settings[Setting::max_rows_to_transfer], settings[Setting::max_bytes_to_transfer], settings[Setting::transfer_overflow_mode]); auto prepared_sets_cache = context->getPreparedSetsCache(); - auto plan = build(network_transfer_limits, prepared_sets_cache); + auto plan = build_unsafe(network_transfer_limits, prepared_sets_cache); if (!plan) return; @@ -242,7 +262,9 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context) if (!context->getSettingsRef()[Setting::use_index_for_in_with_subqueries]) return nullptr; - if (auto set = get()) + std::lock_guard lock(mutex); + + if (auto set = get_unsafe()) { if (set->hasExplicitSetElements()) return set; @@ -264,7 +286,7 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context) SizeLimits network_transfer_limits(settings[Setting::max_rows_to_transfer], settings[Setting::max_bytes_to_transfer], settings[Setting::transfer_overflow_mode]); auto prepared_sets_cache = context->getPreparedSetsCache(); - auto plan = build(network_transfer_limits, prepared_sets_cache); + auto plan = build_unsafe(network_transfer_limits, prepared_sets_cache); if (!plan) return nullptr; diff --git a/src/Interpreters/PreparedSets.h b/src/Interpreters/PreparedSets.h index cfd7435833f8..3f7ef9752edd 100644 --- a/src/Interpreters/PreparedSets.h +++ b/src/Interpreters/PreparedSets.h @@ -170,6 +170,11 @@ class FutureSetFromSubquery final : public FutureSet QueryPlan * getQueryPlan() { return source.get(); } private: + SetPtr get_unsafe() const; + std::unique_ptr build_unsafe( + const SizeLimits & network_transfer_limits, + const PreparedSetsCachePtr & prepared_sets_cache); + Hash hash; ASTPtr ast; SetAndKeyPtr set_and_key; @@ -178,6 +183,8 @@ class FutureSetFromSubquery final : public FutureSet std::unique_ptr source; QueryTreeNodePtr query_tree; + + mutable std::mutex mutex; }; using FutureSetFromSubqueryPtr = std::shared_ptr; diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index b3a1f7653bd4..7d0c6fb97177 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -573,7 +573,7 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); auto info = getQueryTreeInfo(query_info.query_tree, context); - if (info.has_join || info.has_cross_join /*|| info.has_local_columns_in_where*/) + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) return QueryProcessingStage::Enum::FetchColumns; } diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index a92a1581dc48..5c517c1f183d 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -734,22 +734,22 @@ def test_cluster_joins(started_cluster): assert res == "Jack\tSparrow\nJohn\tDow\n" - #res = node.query( - # f""" - # SELECT name - # FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` - # WHERE tag in ( - # SELECT id - # FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}` - # ) - # ORDER BY ALL - # SETTINGS - # object_storage_cluster='cluster_simple', - # object_storage_cluster_join_mode='local' - # """ - #) - - #assert res == "Jack\nJohn\n" + res = node.query( + f""" + SELECT name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + WHERE tag in ( + SELECT id + FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\nJohn\n" res = node.query( f""" @@ -767,22 +767,22 @@ def test_cluster_joins(started_cluster): assert res == "Jack\tBlack\nJohn\tSilver\n" - #res = node.query( - # f""" - # SELECT name - # FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` - # WHERE tag in ( - # SELECT id - # FROM `{table_name_local}` - # ) - # ORDER BY ALL - # SETTINGS - # object_storage_cluster='cluster_simple', - # object_storage_cluster_join_mode='local' - # """ - #) - - #assert res == "Jack\nJohn\n" + res = node.query( + f""" + SELECT name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + WHERE tag in ( + SELECT id + FROM `{table_name_local}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\nJohn\n" res = node.query( f""" diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 5cecf58f37b0..d607f390ba3c 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1049,6 +1049,7 @@ def test_joins(started_cluster): node = started_cluster.instances["s0_0_0"] # Table join_table only exists on the node 's0_0_0'. + node.query("DROP TABLE IF EXISTS join_table SYNC") node.query( """ CREATE TABLE IF NOT EXISTS join_table ( @@ -1163,19 +1164,19 @@ def test_joins(started_cluster): res = list(map(str.split, result5.splitlines())) assert len(res) == 6 - #result6 = node.query( - # f""" - # SELECT name FROM - # s3Cluster('cluster_simple', - # 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', - # 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') - # WHERE value IN (SELECT id FROM join_table) - # ORDER BY name - # SETTINGS object_storage_cluster_join_mode='local'; - # """ - #) - #res = list(map(str.split, result6.splitlines())) - #assert len(res) == 25 + result6 = node.query( + f""" + SELECT name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + WHERE value IN (SELECT id FROM join_table) + ORDER BY name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + res = list(map(str.split, result6.splitlines())) + assert len(res) == 25 def test_graceful_shutdown(started_cluster): diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index a5f08571c862..444b47e90bd3 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3551,6 +3551,10 @@ def execute_spark_query(query: str): # Warm up metadata cache for replica in started_cluster.instances.values(): + replica.query("SYSTEM DROP UNCOMPRESSED CACHE") + replica.query("SYSTEM DROP QUERY CACHE") + replica.query("SYSTEM DROP FILESYSTEM CACHE") + replica.query("SYSTEM DROP ICEBERG METADATA CACHE") replica.query(f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0") all_data_expected_query_id = str(uuid.uuid4()) @@ -3738,22 +3742,22 @@ def execute_spark_query(query: str, table_name): assert res == "jack\tsparrow\njohn\tdow\n" - #res = instance.query( - # f""" - # SELECT name - # FROM {creation_expression} - # WHERE tag in ( - # SELECT id - # FROM {creation_expression_2} - # ) - # ORDER BY ALL - # SETTINGS - # object_storage_cluster='cluster_simple', - # object_storage_cluster_join_mode='local' - # """ - #) + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag in ( + SELECT id + FROM {creation_expression_2} + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) - #assert res == "jack\njohn\n" + assert res == "jack\njohn\n" res = instance.query( f""" @@ -3771,22 +3775,22 @@ def execute_spark_query(query: str, table_name): assert res == "jack\tblack\njohn\tsilver\n" - #res = instance.query( - # f""" - # SELECT name - # FROM {creation_expression} - # WHERE tag in ( - # SELECT id - # FROM `{TABLE_NAME_LOCAL}` - # ) - # ORDER BY ALL - # SETTINGS - # object_storage_cluster='cluster_simple', - # object_storage_cluster_join_mode='local' - # """ - #) + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag in ( + SELECT id + FROM `{TABLE_NAME_LOCAL}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) - #assert res == "jack\njohn\n" + assert res == "jack\njohn\n" res = instance.query( f"""