Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions src/Interpreters/PreparedSets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ FutureSetFromSubquery::FutureSetFromSubquery(
FutureSetFromSubquery::~FutureSetFromSubquery() = default;

SetPtr FutureSetFromSubquery::get() const
{
std::lock_guard lock(mutex);
return get_unsafe();
}

SetPtr FutureSetFromSubquery::get_unsafe() const
{
if (set_and_key->set != nullptr && set_and_key->set->isCreated())
return set_and_key->set;
Expand All @@ -181,20 +187,32 @@ SetPtr FutureSetFromSubquery::get() const

void FutureSetFromSubquery::setQueryPlan(std::unique_ptr<QueryPlan> source_)
{
std::lock_guard lock(mutex);
source = std::move(source_);
set_and_key->set->setHeader(source->getCurrentHeader()->getColumnsWithTypeAndName());
}

void FutureSetFromSubquery::setExternalTable(StoragePtr external_table_) { external_table = std::move(external_table_); }
void FutureSetFromSubquery::setExternalTable(StoragePtr external_table_)
{
std::lock_guard lock(mutex);
external_table = std::move(external_table_);
}

DataTypes FutureSetFromSubquery::getTypes() const
{
std::lock_guard lock(mutex);
return set_and_key->set->getElementsTypes();
}

FutureSet::Hash FutureSetFromSubquery::getHash() const { return hash; }

std::unique_ptr<QueryPlan> FutureSetFromSubquery::build(const SizeLimits & network_transfer_limits, const PreparedSetsCachePtr & prepared_sets_cache)
{
std::lock_guard lock(mutex);
return build_unsafe(network_transfer_limits, prepared_sets_cache);
}

std::unique_ptr<QueryPlan> FutureSetFromSubquery::build_unsafe(const SizeLimits & network_transfer_limits, const PreparedSetsCachePtr & prepared_sets_cache)
{
if (set_and_key->set->isCreated())
return nullptr;
Expand All @@ -217,14 +235,16 @@ std::unique_ptr<QueryPlan> FutureSetFromSubquery::build(const SizeLimits & netwo

void FutureSetFromSubquery::buildSetInplace(const ContextPtr & context)
{
std::lock_guard lock(mutex);

if (external_table_set)
external_table_set->buildSetInplace(context);

const auto & settings = context->getSettingsRef();
SizeLimits network_transfer_limits(settings[Setting::max_rows_to_transfer], settings[Setting::max_bytes_to_transfer], settings[Setting::transfer_overflow_mode]);
auto prepared_sets_cache = context->getPreparedSetsCache();

auto plan = build(network_transfer_limits, prepared_sets_cache);
auto plan = build_unsafe(network_transfer_limits, prepared_sets_cache);

if (!plan)
return;
Expand All @@ -242,7 +262,9 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context)
if (!context->getSettingsRef()[Setting::use_index_for_in_with_subqueries])
return nullptr;

if (auto set = get())
std::lock_guard lock(mutex);

if (auto set = get_unsafe())
{
if (set->hasExplicitSetElements())
return set;
Expand All @@ -264,7 +286,7 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context)
SizeLimits network_transfer_limits(settings[Setting::max_rows_to_transfer], settings[Setting::max_bytes_to_transfer], settings[Setting::transfer_overflow_mode]);
auto prepared_sets_cache = context->getPreparedSetsCache();

auto plan = build(network_transfer_limits, prepared_sets_cache);
auto plan = build_unsafe(network_transfer_limits, prepared_sets_cache);
if (!plan)
return nullptr;

Expand Down
7 changes: 7 additions & 0 deletions src/Interpreters/PreparedSets.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ class FutureSetFromSubquery final : public FutureSet
QueryPlan * getQueryPlan() { return source.get(); }

private:
SetPtr get_unsafe() const;
std::unique_ptr<QueryPlan> build_unsafe(
const SizeLimits & network_transfer_limits,
const PreparedSetsCachePtr & prepared_sets_cache);

Hash hash;
ASTPtr ast;
SetAndKeyPtr set_and_key;
Expand All @@ -178,6 +183,8 @@ class FutureSetFromSubquery final : public FutureSet

std::unique_ptr<QueryPlan> source;
QueryTreeNodePtr query_tree;

mutable std::mutex mutex;
};

using FutureSetFromSubqueryPtr = std::shared_ptr<FutureSetFromSubquery>;
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");

auto info = getQueryTreeInfo(query_info.query_tree, context);
if (info.has_join || info.has_cross_join /*|| info.has_local_columns_in_where*/)
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
return QueryProcessingStage::Enum::FetchColumns;
}

Expand Down
64 changes: 32 additions & 32 deletions tests/integration/test_database_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,22 +734,22 @@ def test_cluster_joins(started_cluster):

assert res == "Jack\tSparrow\nJohn\tDow\n"

#res = node.query(
# f"""
# SELECT name
# FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
# WHERE tag in (
# SELECT id
# FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}`
# )
# ORDER BY ALL
# SETTINGS
# object_storage_cluster='cluster_simple',
# object_storage_cluster_join_mode='local'
# """
#)

#assert res == "Jack\nJohn\n"
res = node.query(
f"""
SELECT name
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
WHERE tag in (
SELECT id
FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}`
)
ORDER BY ALL
SETTINGS
object_storage_cluster='cluster_simple',
object_storage_cluster_join_mode='local'
"""
)

assert res == "Jack\nJohn\n"

res = node.query(
f"""
Expand All @@ -767,22 +767,22 @@ def test_cluster_joins(started_cluster):

assert res == "Jack\tBlack\nJohn\tSilver\n"

#res = node.query(
# f"""
# SELECT name
# FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
# WHERE tag in (
# SELECT id
# FROM `{table_name_local}`
# )
# ORDER BY ALL
# SETTINGS
# object_storage_cluster='cluster_simple',
# object_storage_cluster_join_mode='local'
# """
#)

#assert res == "Jack\nJohn\n"
res = node.query(
f"""
SELECT name
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
WHERE tag in (
SELECT id
FROM `{table_name_local}`
)
ORDER BY ALL
SETTINGS
object_storage_cluster='cluster_simple',
object_storage_cluster_join_mode='local'
"""
)

assert res == "Jack\nJohn\n"

res = node.query(
f"""
Expand Down
27 changes: 14 additions & 13 deletions tests/integration/test_s3_cluster/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,7 @@ def test_joins(started_cluster):
node = started_cluster.instances["s0_0_0"]

# Table join_table only exists on the node 's0_0_0'.
node.query("DROP TABLE IF EXISTS join_table SYNC")
node.query(
"""
CREATE TABLE IF NOT EXISTS join_table (
Expand Down Expand Up @@ -1163,19 +1164,19 @@ def test_joins(started_cluster):
res = list(map(str.split, result5.splitlines()))
assert len(res) == 6

#result6 = node.query(
# f"""
# SELECT name FROM
# s3Cluster('cluster_simple',
# 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV',
# 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
# WHERE value IN (SELECT id FROM join_table)
# ORDER BY name
# SETTINGS object_storage_cluster_join_mode='local';
# """
#)
#res = list(map(str.split, result6.splitlines()))
#assert len(res) == 25
result6 = node.query(
f"""
SELECT name FROM
s3Cluster('cluster_simple',
'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
WHERE value IN (SELECT id FROM join_table)
ORDER BY name
SETTINGS object_storage_cluster_join_mode='local';
"""
)
res = list(map(str.split, result6.splitlines()))
assert len(res) == 25


def test_graceful_shutdown(started_cluster):
Expand Down
64 changes: 34 additions & 30 deletions tests/integration/test_storage_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3551,6 +3551,10 @@ def execute_spark_query(query: str):

# Warm up metadata cache
for replica in started_cluster.instances.values():
replica.query("SYSTEM DROP UNCOMPRESSED CACHE")
replica.query("SYSTEM DROP QUERY CACHE")
replica.query("SYSTEM DROP FILESYSTEM CACHE")
replica.query("SYSTEM DROP ICEBERG METADATA CACHE")
replica.query(f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0")

all_data_expected_query_id = str(uuid.uuid4())
Expand Down Expand Up @@ -3738,22 +3742,22 @@ def execute_spark_query(query: str, table_name):

assert res == "jack\tsparrow\njohn\tdow\n"

#res = instance.query(
# f"""
# SELECT name
# FROM {creation_expression}
# WHERE tag in (
# SELECT id
# FROM {creation_expression_2}
# )
# ORDER BY ALL
# SETTINGS
# object_storage_cluster='cluster_simple',
# object_storage_cluster_join_mode='local'
# """
#)
res = instance.query(
f"""
SELECT name
FROM {creation_expression}
WHERE tag in (
SELECT id
FROM {creation_expression_2}
)
ORDER BY ALL
SETTINGS
object_storage_cluster='cluster_simple',
object_storage_cluster_join_mode='local'
"""
)

#assert res == "jack\njohn\n"
assert res == "jack\njohn\n"

res = instance.query(
f"""
Expand All @@ -3771,22 +3775,22 @@ def execute_spark_query(query: str, table_name):

assert res == "jack\tblack\njohn\tsilver\n"

#res = instance.query(
# f"""
# SELECT name
# FROM {creation_expression}
# WHERE tag in (
# SELECT id
# FROM `{TABLE_NAME_LOCAL}`
# )
# ORDER BY ALL
# SETTINGS
# object_storage_cluster='cluster_simple',
# object_storage_cluster_join_mode='local'
# """
#)
res = instance.query(
f"""
SELECT name
FROM {creation_expression}
WHERE tag in (
SELECT id
FROM `{TABLE_NAME_LOCAL}`
)
ORDER BY ALL
SETTINGS
object_storage_cluster='cluster_simple',
object_storage_cluster_join_mode='local'
"""
)

#assert res == "jack\njohn\n"
assert res == "jack\njohn\n"

res = instance.query(
f"""
Expand Down
Loading