Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,15 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
}

ASTPtr object_storage_type_arg;
configuration->extractDynamicStorageType(args, context, &object_storage_type_arg);
if (cluster_name_in_settings)
configuration->extractDynamicStorageType(args, context, &object_storage_type_arg);
else
{
auto args_copy = args;
// Remove cluster name from args to avoid confusing cluster name and named collection name
args_copy.erase(args_copy.begin());
configuration->extractDynamicStorageType(args_copy, context, &object_storage_type_arg);
}
ASTPtr settings_temporary_storage = nullptr;
for (auto * it = args.begin(); it != args.end(); ++it)
{
Expand Down
9 changes: 6 additions & 3 deletions tests/integration/helpers/iceberg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def get_creation_expression(
explicit_metadata_path="",
storage_type_as_arg=False,
storage_type_in_named_collection=False,
cluster_name_as_literal=True,
additional_settings = [],
**kwargs,
):
Expand Down Expand Up @@ -224,6 +225,8 @@ def get_creation_expression(
else:
settings_expression = ""

cluster_name = "'cluster_simple'" if cluster_name_as_literal else "cluster_simple"

storage_arg = storage_type
engine_part = ""
if (storage_type_in_named_collection):
Expand Down Expand Up @@ -252,7 +255,7 @@ def get_creation_expression(

if run_on_cluster:
assert table_function
return f"iceberg{engine_part}Cluster('cluster_simple', {storage_arg}, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
return f"iceberg{engine_part}Cluster({cluster_name}, {storage_arg}, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
else:
if table_function:
return f"iceberg{engine_part}({storage_arg}, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
Expand All @@ -271,7 +274,7 @@ def get_creation_expression(
if run_on_cluster:
assert table_function
return f"""
iceberg{engine_part}Cluster('cluster_simple', {storage_arg}, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
iceberg{engine_part}Cluster({cluster_name}, {storage_arg}, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
"""
else:
if table_function:
Expand All @@ -293,7 +296,7 @@ def get_creation_expression(
if run_on_cluster:
assert table_function
return f"""
iceberg{engine_part}Cluster('cluster_simple', {storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
iceberg{engine_part}Cluster({cluster_name}, {storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
"""
else:
if table_function:
Expand Down
6 changes: 4 additions & 2 deletions tests/integration/test_storage_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,8 @@ def count_secondary_subqueries(started_cluster, query_id, expected, comment):

@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
def test_cluster_table_function(started_cluster, format_version, storage_type):
@pytest.mark.parametrize("cluster_name_as_literal", [True, False])
def test_cluster_table_function(started_cluster, format_version, storage_type, cluster_name_as_literal):

instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
Expand Down Expand Up @@ -471,7 +472,7 @@ def add_df(mode):

# Regular Query only node1
table_function_expr = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True
storage_type, TABLE_NAME, started_cluster, table_function=True, cluster_name_as_literal=cluster_name_as_literal
)
select_regular = (
instance.query(f"SELECT * FROM {table_function_expr}").strip().split()
Expand All @@ -492,6 +493,7 @@ def make_query_from_function(
run_on_cluster=run_on_cluster,
storage_type_as_arg=storage_type_as_arg,
storage_type_in_named_collection=storage_type_in_named_collection,
cluster_name_as_literal=cluster_name_as_literal,
)
query_id = str(uuid.uuid4())
settings = f"SETTINGS object_storage_cluster='cluster_simple'" if (alt_syntax and not run_on_cluster) else ""
Expand Down
Loading