From 1d51d0799a006e36ff00f156ae02c48f17c62f0a Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 15 Jan 2025 16:17:05 +0100 Subject: [PATCH 1/3] Alternative syntax for object storage cluster functions --- src/Core/Settings.cpp | 9 + src/Core/SettingsChangesHistory.cpp | 2 + .../StorageObjectStorageCluster.cpp | 16 +- .../StorageObjectStorageCluster.h | 3 + .../TableFunctionObjectStorage.cpp | 97 ------ .../TableFunctionObjectStorageCluster.cpp | 1 + .../TableFunctionObjectStorageCluster.h | 2 - ...leFunctionObjectStorageClusterFallback.cpp | 325 ++++++++++++++++++ ...ableFunctionObjectStorageClusterFallback.h | 49 +++ src/TableFunctions/registerTableFunctions.cpp | 1 + src/TableFunctions/registerTableFunctions.h | 1 + tests/integration/test_s3_cluster/test.py | 157 +++++++++ .../test_storage_azure_blob_storage/test.py | 6 +- .../test_cluster.py | 57 ++- tests/integration/test_storage_hdfs/test.py | 49 ++- .../integration/test_storage_iceberg/test.py | 51 ++- 16 files changed, 708 insertions(+), 118 deletions(-) create mode 100644 src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp create mode 100644 src/TableFunctions/TableFunctionObjectStorageClusterFallback.h diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index d527360c6f10..59afa3291d52 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5924,6 +5924,15 @@ Allow to create database with Engine=MaterializedPostgreSQL(...). /** Experimental feature for moving data between shards. */ \ DECLARE(Bool, allow_experimental_query_deduplication, false, R"( Experimental data deduplication for SELECT queries based on part UUIDs +)", EXPERIMENTAL) \ + DECLARE(String, object_storage_cluster_function_cluster, "", R"( +Cluster to make distributed requests to object storages with alternative syntax. +)", EXPERIMENTAL) \ + DECLARE(UInt64, object_storage_cluster_function_max_hosts, 0, R"( +Limit for hosts used for request in object storage cluster table functions - azureBlobStorageCluster, s3Cluster, hdfsCluster, etc. +Possible values: +- Positive integer. +- 0 — All hosts in cluster. )", EXPERIMENTAL) \ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 8b1fc4008189..ef4c09ea4568 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -71,6 +71,8 @@ static std::initializer_listgetEngineName(), table_id_.table_name))) , configuration{configuration_} , object_storage(object_storage_) + , cluster_name_in_settings(false) { ColumnsDescription columns{columns_}; std::string sample_path; @@ -105,10 +106,17 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( configuration->getEngineName()); } - ASTPtr cluster_name_arg = args.front(); - args.erase(args.begin()); - configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true); - args.insert(args.begin(), cluster_name_arg); + if (cluster_name_in_settings) + { + configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true); + } + else + { + ASTPtr cluster_name_arg = args.front(); + args.erase(args.begin()); + configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true); + args.insert(args.begin(), cluster_name_arg); + } } RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 0088ff28fc22..57bd92222af1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -29,6 +29,8 @@ class StorageObjectStorageCluster : public IStorageCluster String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context); + void setClusterNameInSettings(bool cluster_name_in_settings_) { cluster_name_in_settings = cluster_name_in_settings_; } + private: void updateQueryToSendIfNeeded( ASTPtr & query, @@ -39,6 +41,7 @@ class StorageObjectStorageCluster : public IStorageCluster const StorageObjectStorage::ConfigurationPtr configuration; const ObjectStoragePtr object_storage; NamesAndTypesList virtual_columns; + bool cluster_name_in_settings; }; } diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 1ed803ae5ceb..7207c500b96a 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -130,17 +130,6 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory) { UNUSED(factory); #if USE_AWS_S3 - factory.registerFunction>( - { - .documentation = - { - .description=R"(The table function can be used to read the data stored on AWS S3.)", - .examples{{"s3", "SELECT * FROM s3(url, access_key_id, secret_access_key)", ""} - }, - .categories{"DataLake"}}, - .allow_readonly = false - }); - factory.registerFunction>( { .documentation = @@ -173,38 +162,6 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory) .allow_readonly = false }); #endif - -#if USE_AZURE_BLOB_STORAGE - factory.registerFunction>( - { - .documentation = - { - .description=R"(The table function can be used to read the data stored on Azure Blob Storage.)", - .examples{ - { - "azureBlobStorage", - "SELECT * FROM azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, " - "[account_name, account_key, format, compression, structure])", "" - }} - }, - .allow_readonly = false - }); -#endif -#if USE_HDFS - factory.registerFunction>( - { - .documentation = - { - .description=R"(The table function can be used to read the data stored on HDFS virtual filesystem.)", - .examples{ - { - "hdfs", - "SELECT * FROM hdfs(url, format, compression, structure])", "" - }} - }, - .allow_readonly = false - }); -#endif } #if USE_AZURE_BLOB_STORAGE @@ -256,29 +213,6 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory) .examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}}, .categories{"DataLake"}}, .allow_readonly = false}); - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store.)", - .examples{{"icebergS3", "SELECT * FROM icebergS3(url, access_key_id, secret_access_key)", ""}}, - .categories{"DataLake"}}, - .allow_readonly = false}); - -#endif -#if USE_AZURE_BLOB_STORAGE - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store.)", - .examples{{"icebergAzure", "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key)", ""}}, - .categories{"DataLake"}}, - .allow_readonly = false}); -#endif -#if USE_HDFS - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem.)", - .examples{{"icebergHDFS", "SELECT * FROM icebergHDFS(url)", ""}}, - .categories{"DataLake"}}, - .allow_readonly = false}); #endif factory.registerFunction( {.documentation @@ -290,42 +224,11 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory) #endif -#if USE_AWS_S3 -#if USE_PARQUET -void registerTableFunctionDeltaLake(TableFunctionFactory & factory) -{ - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the DeltaLake table stored on object store.)", - .examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)", ""}}, - .categories{"DataLake"}}, - .allow_readonly = false}); -} -#endif - -void registerTableFunctionHudi(TableFunctionFactory & factory) -{ - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the Hudi table stored on object store.)", - .examples{{"hudi", "SELECT * FROM hudi(url, access_key_id, secret_access_key)", ""}}, - .categories{"DataLake"}}, - .allow_readonly = false}); -} - -#endif - void registerDataLakeTableFunctions(TableFunctionFactory & factory) { UNUSED(factory); #if USE_AVRO registerTableFunctionIceberg(factory); #endif -#if USE_AWS_S3 -#if USE_PARQUET - registerTableFunctionDeltaLake(factory); -#endif - registerTableFunctionHudi(factory); -#endif } } diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index be7603f18e61..54d5c101826f 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -22,6 +22,7 @@ StoragePtr TableFunctionObjectStorageCluster::execute auto configuration = Base::getConfiguration(); ColumnsDescription columns; + if (configuration->structure != "auto") columns = parseColumnsListFromString(configuration->structure, context); else if (!Base::structure_hint.empty()) diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.h b/src/TableFunctions/TableFunctionObjectStorageCluster.h index d1fac5fdc2c1..16cea9dfdafc 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.h +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.h @@ -10,8 +10,6 @@ namespace DB class Context; -class StorageS3Settings; -class StorageAzureBlobSettings; class StorageS3Configuration; class StorageAzureConfiguration; diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp new file mode 100644 index 000000000000..94408ecb80fb --- /dev/null +++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp @@ -0,0 +1,325 @@ +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace Setting +{ + extern const SettingsString object_storage_cluster_function_cluster; +} + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +struct S3ClusterFallbackDefinition +{ + static constexpr auto name = "s3"; + static constexpr auto storage_type_name = "S3"; + static constexpr auto storage_type_cluster_name = "S3Cluster"; +}; + +struct AzureClusterFallbackDefinition +{ + static constexpr auto name = "azureBlobStorage"; + static constexpr auto storage_type_name = "Azure"; + static constexpr auto storage_type_cluster_name = "AzureBlobStorageCluster"; +}; + +struct HDFSClusterFallbackDefinition +{ + static constexpr auto name = "hdfs"; + static constexpr auto storage_type_name = "HDFS"; + static constexpr auto storage_type_cluster_name = "HDFSCluster"; +}; + +struct IcebergS3ClusterFallbackDefinition +{ + static constexpr auto name = "icebergS3"; + static constexpr auto storage_type_name = "S3"; + static constexpr auto storage_type_cluster_name = "IcebergS3Cluster"; +}; + +struct IcebergAzureClusterFallbackDefinition +{ + static constexpr auto name = "icebergAzure"; + static constexpr auto storage_type_name = "Azure"; + static constexpr auto storage_type_cluster_name = "IcebergAzureCluster"; +}; + +struct IcebergHDFSClusterFallbackDefinition +{ + static constexpr auto name = "icebergHDFS"; + static constexpr auto storage_type_name = "HDFS"; + static constexpr auto storage_type_cluster_name = "IcebergHDFSCluster"; +}; + +struct DeltaLakeClusterFallbackDefinition +{ + static constexpr auto name = "deltaLake"; + static constexpr auto storage_type_name = "S3"; + static constexpr auto storage_type_cluster_name = "DeltaLakeS3Cluster"; +}; + +struct HudiClusterFallbackDefinition +{ + static constexpr auto name = "hudi"; + static constexpr auto storage_type_name = "S3"; + static constexpr auto storage_type_cluster_name = "HudiS3Cluster"; +}; + +template +void TableFunctionObjectStorageClusterFallback::parseArgumentsImpl(ASTs & args, const ContextPtr & context) +{ + if (args.empty()) + throw Exception( + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "The function {} should have arguments. The first argument must be the cluster name and the rest are the arguments of " + "corresponding table function", + getName()); + + const auto & settings = context->getSettingsRef(); + + is_cluster_function = !settings[Setting::object_storage_cluster_function_cluster].value.empty(); + + if (is_cluster_function) + { + ASTPtr cluster_name_arg = std::make_shared(settings[Setting::object_storage_cluster_function_cluster].value); + args.insert(args.begin(), cluster_name_arg); + BaseCluster::parseArgumentsImpl(args, context); + args.erase(args.begin()); + } + else + BaseSimple::parseArgumentsImpl(args, context); +} + +template +StoragePtr TableFunctionObjectStorageClusterFallback::executeImpl( + const ASTPtr & ast_function, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns, + bool is_insert_query) const +{ + if (is_cluster_function) + { + auto result = BaseCluster::executeImpl(ast_function, context, table_name, cached_columns, is_insert_query); + if (auto storage = typeid_cast>(result)) + storage->setClusterNameInSettings(true); + return result; + } + else + return BaseSimple::executeImpl(ast_function, context, table_name, cached_columns, is_insert_query); +} + +#if USE_AWS_S3 +using TableFunctionS3ClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_AZURE_BLOB_STORAGE +using TableFunctionAzureClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_HDFS +using TableFunctionHDFSClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_AVRO && USE_AWS_S3 +using TableFunctionIcebergS3ClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_AVRO && USE_AZURE_BLOB_STORAGE +using TableFunctionIcebergAzureClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_AVRO && USE_HDFS +using TableFunctionIcebergHDFSClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_AWS_S3 && USE_PARQUET +using TableFunctionDeltaLakeClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_AWS_S3 +using TableFunctionHudiClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & factory) +{ + UNUSED(factory); +#if USE_AWS_S3 + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the data stored on S3 in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + {"s3", "SELECT * FROM s3(url, format, structure)", ""}, + {"s3", "SELECT * FROM s3(url, format, structure) SETTINGS object_storage_cluster_function_cluster='cluster'", ""} + }, + }, + .allow_readonly = false + } + ); +#endif + +#if USE_AZURE_BLOB_STORAGE + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the data stored on Azure Blob Storage in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "azureBlobStorage", + "SELECT * FROM azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, " + "[account_name, account_key, format, compression, structure])", "" + }, + { + "azureBlobStorage", + "SELECT * FROM azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, " + "[account_name, account_key, format, compression, structure]) " + "SETTINGS object_storage_cluster_function_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif + +#if USE_HDFS + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the data stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "hdfs", + "SELECT * FROM hdfs(url, format, compression, structure])", "" + }, + { + "hdfs", + "SELECT * FROM hdfs(url, format, compression, structure]) " + "SETTINGS object_storage_cluster_function_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif + +#if USE_AVRO && USE_AWS_S3 + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "icebergS3", + "SELECT * FROM icebergS3(url, access_key_id, secret_access_key)", "" + }, + { + "icebergS3", + "SELECT * FROM icebergS3(url, access_key_id, secret_access_key) " + "SETTINGS object_storage_cluster_function_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif + +#if USE_AVRO && USE_AZURE_BLOB_STORAGE + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "icebergAzure", + "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key)", "" + }, + { + "icebergAzure", + "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key) " + "SETTINGS object_storage_cluster_function_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif + +#if USE_AVRO && USE_HDFS + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "icebergHDFS", + "SELECT * FROM icebergHDFS(url)", "" + }, + { + "icebergHDFS", + "SELECT * FROM icebergHDFS(url) SETTINGS object_storage_cluster_function_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif + +#if USE_AWS_S3 && USE_PARQUET + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "deltaLake", + "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)", "" + }, + { + "deltaLake", + "SELECT * FROM deltaLake(url, access_key_id, secret_access_key) " + "SETTINGS object_storage_cluster_function_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif + +#if USE_AWS_S3 + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "hudi", + "SELECT * FROM hudi(url, access_key_id, secret_access_key)", "" + }, + { + "hudi", + "SELECT * FROM hudi(url, access_key_id, secret_access_key) SETTINGS object_storage_cluster_function_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif +} + +} diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h new file mode 100644 index 000000000000..5485f08d54da --- /dev/null +++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h @@ -0,0 +1,49 @@ +#pragma once +#include "config.h" +#include + +namespace DB +{ + +/** +* Class implementing s3/hdfs/azureBlobStorage(...) table functions, +* which allow to use simple or distributed function variant based on settings. +* If setting `object_storage_cluster_function_cluster` is empty, +* simple single-host variant is used, if setting not empty, cluster variant is used. +* `SELECT * FROM s3('s3://...', ...) SETTINGS object_storage_cluster_function_cluster='cluster'` +* is equal to +* `SELECT * FROM s3Cluster('cluster', 's3://...', ...)` +*/ + +template +class TableFunctionObjectStorageClusterFallback : public Base +{ +public: + using BaseCluster = Base; + using BaseSimple = BaseCluster::Base; + + virtual ~TableFunctionObjectStorageClusterFallback() override = default; + + static constexpr auto name = Definition::name; + + String getName() const override { return name; } + +private: + const char * getStorageTypeName() const override + { + return is_cluster_function ? Definition::storage_type_cluster_name : Definition::storage_type_name; + } + + StoragePtr executeImpl( + const ASTPtr & ast_function, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns, + bool is_insert_query) const override; + + void parseArgumentsImpl(ASTs & args, const ContextPtr & context) override; + + bool is_cluster_function = false; +}; + +} diff --git a/src/TableFunctions/registerTableFunctions.cpp b/src/TableFunctions/registerTableFunctions.cpp index 131ca783f73f..c7b852b96fc8 100644 --- a/src/TableFunctions/registerTableFunctions.cpp +++ b/src/TableFunctions/registerTableFunctions.cpp @@ -65,6 +65,7 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]] registerTableFunctionObjectStorage(factory); registerTableFunctionObjectStorageCluster(factory); + registerTableFunctionObjectStorageClusterFallback(factory); registerDataLakeTableFunctions(factory); registerDataLakeClusterTableFunctions(factory); } diff --git a/src/TableFunctions/registerTableFunctions.h b/src/TableFunctions/registerTableFunctions.h index 1168a8ef739e..fee7ca911fa7 100644 --- a/src/TableFunctions/registerTableFunctions.h +++ b/src/TableFunctions/registerTableFunctions.h @@ -69,6 +69,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory); void registerTableFunctionObjectStorage(TableFunctionFactory & factory); void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory); +void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & factory); void registerDataLakeTableFunctions(TableFunctionFactory & factory); void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory); diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index e8bf031021e2..828ee0774d84 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -124,8 +124,16 @@ def test_select_all(started_cluster): 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)""" ) # print(s3_distributed) + s3_distributed_alt_syntax = node.query( + """ + SELECT * from s3( + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + ) assert TSV(pure_s3) == TSV(s3_distributed) + assert TSV(pure_s3) == TSV(s3_distributed_alt_syntax) def test_count(started_cluster): @@ -146,8 +154,17 @@ def test_count(started_cluster): 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')""" ) # print(s3_distributed) + s3_distributed_alt_syntax = node.query( + """ + SELECT count(*) from s3( + 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + ) assert TSV(pure_s3) == TSV(s3_distributed) + assert TSV(pure_s3) == TSV(s3_distributed_alt_syntax) def test_count_macro(started_cluster): @@ -169,8 +186,17 @@ def test_count_macro(started_cluster): 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')""" ) # print(s3_distributed) + s3_distributed_alt_syntax = node.query( + """ + SELECT count(*) from s3( + 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + ) assert TSV(s3_macro) == TSV(s3_distributed) + assert TSV(s3_macro) == TSV(s3_distributed_alt_syntax) def test_union_all(started_cluster): @@ -211,8 +237,25 @@ def test_union_all(started_cluster): """ ) # print(s3_distributed) + s3_distributed_alt_syntax = node.query( + """ + SELECT * FROM + ( + SELECT * from s3( + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + UNION ALL + SELECT * from s3( + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + ) + ORDER BY (name, value, polygon) + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple' + """ + ) assert TSV(pure_s3) == TSV(s3_distributed) + assert TSV(pure_s3) == TSV(s3_distributed_alt_syntax) def test_wrong_cluster(started_cluster): @@ -233,6 +276,21 @@ def test_wrong_cluster(started_cluster): assert "not found" in error + error = node.query_and_get_error( + """ + SELECT count(*) from s3( + 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + UNION ALL + SELECT count(*) from s3( + 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + SETTINGS object_storage_cluster_function_cluster = 'non_existing_cluster' + """ + ) + + assert "not found" in error + def test_ambiguous_join(started_cluster): node = started_cluster.instances["s0_0_0"] @@ -266,6 +324,17 @@ def test_skip_unavailable_shards(started_cluster): assert result == "10\n" + result = node.query( + """ + SELECT count(*) from s3( + 'http://minio1:9001/root/data/clickhouse/part1.csv', + 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + SETTINGS skip_unavailable_shards = 1, object_storage_cluster_function_cluster = 'cluster_non_existent_port' + """ + ) + + assert result == "10\n" + def test_unset_skip_unavailable_shards(started_cluster): # Although skip_unavailable_shards is not set, cluster table functions should always skip unavailable shards. @@ -281,6 +350,17 @@ def test_unset_skip_unavailable_shards(started_cluster): assert result == "10\n" + result = node.query( + """ + SELECT count(*) from s3( + 'http://minio1:9001/root/data/clickhouse/part1.csv', + 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + SETTINGS object_storage_cluster_function_cluster = 'cluster_non_existent_port' + """ + ) + + assert result == "10\n" + def test_distributed_insert_select_with_replicated(started_cluster): first_replica_first_shard = started_cluster.instances["s0_0_0"] @@ -412,6 +492,20 @@ def test_cluster_with_header(started_cluster): ) == "SomeValue\n" ) + assert ( + node.query( + """SELECT * from s3('http://resolver:8080/bucket/key.csv', headers(MyCustomHeader = 'SomeValue')) + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + ) + == "SomeValue\n" + ) + assert ( + node.query( + """SELECT * from s3('http://resolver:8080/bucket/key.csv', headers(MyCustomHeader = 'SomeValue'), 'CSV') + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + ) + == "SomeValue\n" + ) def test_cluster_with_named_collection(started_cluster): @@ -431,6 +525,20 @@ def test_cluster_with_named_collection(started_cluster): assert TSV(pure_s3) == TSV(s3_cluster) + s3_cluster = node.query( + """SELECT * from s3(test_s3) ORDER BY (c1, c2, c3) + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + ) + + assert TSV(pure_s3) == TSV(s3_cluster) + + s3_cluster = node.query( + """SELECT * from s3(test_s3, structure='auto') ORDER BY (c1, c2, c3) + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + ) + + assert TSV(pure_s3) == TSV(s3_cluster) + def test_cluster_format_detection(started_cluster): node = started_cluster.instances["s0_0_0"] @@ -461,6 +569,20 @@ def test_cluster_format_detection(started_cluster): assert result == expected_result + result = node.query( + """SELECT * FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123') order by c1, c2 + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + ) + + assert result == expected_result + + result = node.query( + """SELECT * FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123', auto, 'a String, b UInt64') order by a, b + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + ) + + assert result == expected_result + def test_cluster_default_expression(started_cluster): node = started_cluster.instances["s0_0_0"] @@ -508,3 +630,38 @@ def test_cluster_default_expression(started_cluster): ) assert result == expected_result + + result = node.query( + """SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'JSONEachRow', 'id UInt32, date Date DEFAULT 18262') order by id + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + ) + + assert result == expected_result + + result = node.query( + """SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'auto', 'id UInt32, date Date DEFAULT 18262') order by id + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + ) + + assert result == expected_result + + result = node.query( + """SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'JSONEachRow', 'id UInt32, date Date DEFAULT 18262', 'auto') order by id + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + ) + + assert result == expected_result + + result = node.query( + """SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'auto', 'id UInt32, date Date DEFAULT 18262', 'auto') order by id + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + ) + + assert result == expected_result + + result = node.query( + """SELECT * FROM s3(test_s3_with_default) order by id + SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + ) + + assert result == expected_result diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index e22ce925f6c8..f49cd7e3f400 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -36,14 +36,14 @@ def cluster(): def azure_query( - node, query, expect_error=False, try_num=10, settings={}, query_on_retry=None + node, query, expect_error=False, try_num=10, settings={}, query_on_retry=None, query_id=None ): for i in range(try_num): try: if expect_error: - return node.query_and_get_error(query, settings=settings) + return node.query_and_get_error(query, settings=settings, query_id=query_id) else: - return node.query(query, settings=settings) + return node.query(query, settings=settings, query_id=query_id) except Exception as ex: retriable_errors = [ "DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response", diff --git a/tests/integration/test_storage_azure_blob_storage/test_cluster.py b/tests/integration/test_storage_azure_blob_storage/test_cluster.py index aef6e426572e..ea6b9c512952 100644 --- a/tests/integration/test_storage_azure_blob_storage/test_cluster.py +++ b/tests/integration/test_storage_azure_blob_storage/test_cluster.py @@ -8,6 +8,7 @@ import random import threading import time +import uuid import pytest from azure.storage.blob import BlobServiceClient @@ -76,21 +77,64 @@ def test_select_all(cluster): ) print(get_azure_file_content("test_cluster_select_all.csv", port)) + query_id_pure = str(uuid.uuid4()) pure_azure = azure_query( node, f"SELECT * from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1'," f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV','auto')", + query_id=query_id_pure, ) print(pure_azure) + query_id_distributed = str(uuid.uuid4()) distributed_azure = azure_query( node, f"SELECT * from azureBlobStorageCluster('simple_cluster', '{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1'," f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV'," - f"'auto')" - "", + f"'auto')", + query_id=query_id_distributed, ) print(distributed_azure) + query_id_distributed_alt_syntax = str(uuid.uuid4()) + distributed_azure_alt_syntax = azure_query( + node, + f"SELECT * from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1'," + f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV'," + f"'auto') " + f"SETTINGS object_storage_cluster_function_cluster='simple_cluster'", + query_id=query_id_distributed_alt_syntax, + ) + print(distributed_azure_alt_syntax) assert TSV(pure_azure) == TSV(distributed_azure) + assert TSV(pure_azure) == TSV(distributed_azure_alt_syntax) + for _, node_ in cluster.instances.items(): + node_.query("SYSTEM FLUSH LOGS") + nodes_pure = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('simple_cluster', system.query_log) + WHERE type='QueryFinish' + AND initial_query_id='{query_id_pure}' + """, + ) + assert int(nodes_pure) == 1 + nodes_distributed = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('simple_cluster', system.query_log) + WHERE type='QueryFinish' + AND initial_query_id='{query_id_distributed}' + """, + ) + assert int(nodes_distributed) == 3 + nodes_distributed_alt_syntax = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('simple_cluster', system.query_log) + WHERE type='QueryFinish' + AND initial_query_id='{query_id_distributed_alt_syntax}' + """, + ) + assert int(nodes_distributed_alt_syntax) == 3 def test_count(cluster): @@ -120,7 +164,16 @@ def test_count(cluster): f"'auto', 'key UInt64')", ) print(distributed_azure) + distributed_azure_alt_syntax = azure_query( + node, + f"SELECT count(*) from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_count.csv', " + f"'devstoreaccount1','Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV'," + f"'auto', 'key UInt64')" + f"SETTINGS object_storage_cluster_function_cluster='simple_cluster'", + ) + print(distributed_azure_alt_syntax) assert TSV(pure_azure) == TSV(distributed_azure) + assert TSV(pure_azure) == TSV(distributed_azure_alt_syntax) def test_union_all(cluster): diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index d37b0d7218f6..632ca081c5ef 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -553,17 +553,58 @@ def test_hdfsCluster(started_cluster): hdfs_api.write_data("/test_hdfsCluster/file2", "2\n") hdfs_api.write_data("/test_hdfsCluster/file3", "3\n") + expected = "1\tfile1\ttest_hdfsCluster/file1\n2\tfile2\ttest_hdfsCluster/file2\n3\tfile3\ttest_hdfsCluster/file3\n" + query_id_pure = str(uuid.uuid4()) actual = node1.query( - "select id, _file as file_name, _path as file_path from hdfs('hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') order by id" + "select id, _file as file_name, _path as file_path from hdfs('hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') order by id", + query_id=query_id_pure, ) - expected = "1\tfile1\ttest_hdfsCluster/file1\n2\tfile2\ttest_hdfsCluster/file2\n3\tfile3\ttest_hdfsCluster/file3\n" assert actual == expected + query_id_cluster = str(uuid.uuid4()) actual = node1.query( - "select id, _file as file_name, _path as file_path from hdfsCluster('test_cluster_two_shards', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') order by id" + "select id, _file as file_name, _path as file_path from hdfsCluster('test_cluster_two_shards', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') order by id", + query_id=query_id_cluster, + ) + assert actual == expected + + query_id_cluster_alt_syntax = str(uuid.uuid4()) + actual = node1.query( + "select id, _file as file_name, _path as file_path from hdfs('hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') order by id", + settings={"object_storage_cluster_function_cluster":"test_cluster_two_shards"}, + query_id=query_id_cluster_alt_syntax, ) - expected = "1\tfile1\ttest_hdfsCluster/file1\n2\tfile2\ttest_hdfsCluster/file2\n3\tfile3\ttest_hdfsCluster/file3\n" assert actual == expected + + node1.query("SYSTEM FLUSH LOGS") + queries_pure = node1.query( + f""" + SELECT count() + FROM system.query_log + WHERE type='QueryFinish' + AND initial_query_id='{query_id_pure}' + """ + ) + assert int(queries_pure) == 1 + queries_cluster = node1.query( + f""" + SELECT count() + FROM system.query_log + WHERE type='QueryFinish' + AND initial_query_id='{query_id_cluster}' + """ + ) + assert int(queries_cluster) == 3 + queries_cluster_alt_syntax = node1.query( + f""" + SELECT count() + FROM system.query_log + WHERE type='QueryFinish' + AND initial_query_id='{query_id_cluster_alt_syntax}' + """ + ) + assert int(queries_cluster_alt_syntax) == 3 + fs.delete(dir, recursive=True) diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index cd79aacd5345..23552aabaa77 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -639,16 +639,37 @@ def add_df(mode): table_function=True, run_on_cluster=True, ) + query_id_cluster = str(uuid.uuid4()) select_cluster = ( - instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split() + instance.query( + f"SELECT * FROM {table_function_expr_cluster}", query_id=query_id_cluster + ) + .strip() + .split() + ) + + # Cluster Query with node1 as coordinator with alternative syntax + query_id_cluster_alt_syntax = str(uuid.uuid4()) + select_cluster_alt_syntax = ( + instance.query( + f""" + SELECT * FROM {table_function_expr} + SETTINGS object_storage_cluster_function_cluster='cluster_simple' + """, + query_id=query_id_cluster_alt_syntax, + ) + .strip() + .split() ) # Simple size check assert len(select_regular) == 600 assert len(select_cluster) == 600 + assert len(select_cluster_alt_syntax) == 600 # Actual check assert select_cluster == select_regular + assert select_cluster_alt_syntax == select_regular # Check query_log for replica in started_cluster.instances.values(): @@ -660,11 +681,29 @@ def add_df(mode): f""" SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log WHERE - type = 'QueryStart' AND - positionCaseInsensitive(query, '{storage_type}Cluster') != 0 AND - position(query, '{TABLE_NAME}') != 0 AND - position(query, 'system.query_log') = 0 AND - NOT is_initial_query + type = 'QueryStart' + AND NOT is_initial_query + AND initial_query_id='{query_id_cluster}' + """ + ) + .strip() + .split("\n") + ) + + logging.info( + f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}" + ) + assert len(cluster_secondary_queries) == 1 + + for node_name, replica in started_cluster.instances.items(): + cluster_secondary_queries = ( + replica.query( + f""" + SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log + WHERE + type = 'QueryStart' + AND NOT is_initial_query + AND initial_query_id='{query_id_cluster_alt_syntax}' """ ) .strip() From 59552593a7409bb689b0bbcf57d4c308f704ccce Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 17 Jan 2025 11:20:04 +0100 Subject: [PATCH 2/3] Fix build --- src/TableFunctions/TableFunctionObjectStorageClusterFallback.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h index 5485f08d54da..9c7afffb6ed3 100644 --- a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h +++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h @@ -22,8 +22,6 @@ class TableFunctionObjectStorageClusterFallback : public Base using BaseCluster = Base; using BaseSimple = BaseCluster::Base; - virtual ~TableFunctionObjectStorageClusterFallback() override = default; - static constexpr auto name = Definition::name; String getName() const override { return name; } From c88f3d3589a5b4010eeae8b5f2fe245309fac027 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 17 Jan 2025 17:12:09 +0100 Subject: [PATCH 3/3] Rename settings --- src/Core/Settings.cpp | 4 +-- src/Core/SettingsChangesHistory.cpp | 4 +-- ...leFunctionObjectStorageClusterFallback.cpp | 22 ++++++------ ...ableFunctionObjectStorageClusterFallback.h | 4 +-- tests/integration/test_s3_cluster/test.py | 36 +++++++++---------- .../test_cluster.py | 4 +-- tests/integration/test_storage_hdfs/test.py | 2 +- .../integration/test_storage_iceberg/test.py | 2 +- 8 files changed, 39 insertions(+), 39 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 59afa3291d52..0b4bd1af28cf 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5925,10 +5925,10 @@ Allow to create database with Engine=MaterializedPostgreSQL(...). DECLARE(Bool, allow_experimental_query_deduplication, false, R"( Experimental data deduplication for SELECT queries based on part UUIDs )", EXPERIMENTAL) \ - DECLARE(String, object_storage_cluster_function_cluster, "", R"( + DECLARE(String, object_storage_cluster, "", R"( Cluster to make distributed requests to object storages with alternative syntax. )", EXPERIMENTAL) \ - DECLARE(UInt64, object_storage_cluster_function_max_hosts, 0, R"( + DECLARE(UInt64, object_storage_max_nodes, 0, R"( Limit for hosts used for request in object storage cluster table functions - azureBlobStorageCluster, s3Cluster, hdfsCluster, etc. Possible values: - Positive integer. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index ef4c09ea4568..d47763370d84 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -71,8 +71,8 @@ static std::initializer_list::parseArguments const auto & settings = context->getSettingsRef(); - is_cluster_function = !settings[Setting::object_storage_cluster_function_cluster].value.empty(); + is_cluster_function = !settings[Setting::object_storage_cluster].value.empty(); if (is_cluster_function) { - ASTPtr cluster_name_arg = std::make_shared(settings[Setting::object_storage_cluster_function_cluster].value); + ASTPtr cluster_name_arg = std::make_shared(settings[Setting::object_storage_cluster].value); args.insert(args.begin(), cluster_name_arg); BaseCluster::parseArgumentsImpl(args, context); args.erase(args.begin()); @@ -159,7 +159,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa .description=R"(The table function can be used to read the data stored on S3 in parallel for many nodes in a specified cluster or from single node.)", .examples{ {"s3", "SELECT * FROM s3(url, format, structure)", ""}, - {"s3", "SELECT * FROM s3(url, format, structure) SETTINGS object_storage_cluster_function_cluster='cluster'", ""} + {"s3", "SELECT * FROM s3(url, format, structure) SETTINGS object_storage_cluster='cluster'", ""} }, }, .allow_readonly = false @@ -182,7 +182,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa "azureBlobStorage", "SELECT * FROM azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, " "[account_name, account_key, format, compression, structure]) " - "SETTINGS object_storage_cluster_function_cluster='cluster'", "" + "SETTINGS object_storage_cluster='cluster'", "" }, } }, @@ -204,7 +204,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa { "hdfs", "SELECT * FROM hdfs(url, format, compression, structure]) " - "SETTINGS object_storage_cluster_function_cluster='cluster'", "" + "SETTINGS object_storage_cluster='cluster'", "" }, } }, @@ -226,7 +226,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa { "icebergS3", "SELECT * FROM icebergS3(url, access_key_id, secret_access_key) " - "SETTINGS object_storage_cluster_function_cluster='cluster'", "" + "SETTINGS object_storage_cluster='cluster'", "" }, } }, @@ -248,7 +248,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa { "icebergAzure", "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key) " - "SETTINGS object_storage_cluster_function_cluster='cluster'", "" + "SETTINGS object_storage_cluster='cluster'", "" }, } }, @@ -269,7 +269,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa }, { "icebergHDFS", - "SELECT * FROM icebergHDFS(url) SETTINGS object_storage_cluster_function_cluster='cluster'", "" + "SELECT * FROM icebergHDFS(url) SETTINGS object_storage_cluster='cluster'", "" }, } }, @@ -291,7 +291,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa { "deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key) " - "SETTINGS object_storage_cluster_function_cluster='cluster'", "" + "SETTINGS object_storage_cluster='cluster'", "" }, } }, @@ -312,7 +312,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa }, { "hudi", - "SELECT * FROM hudi(url, access_key_id, secret_access_key) SETTINGS object_storage_cluster_function_cluster='cluster'", "" + "SELECT * FROM hudi(url, access_key_id, secret_access_key) SETTINGS object_storage_cluster='cluster'", "" }, } }, diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h index 9c7afffb6ed3..afa6b8b49f11 100644 --- a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h +++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h @@ -8,9 +8,9 @@ namespace DB /** * Class implementing s3/hdfs/azureBlobStorage(...) table functions, * which allow to use simple or distributed function variant based on settings. -* If setting `object_storage_cluster_function_cluster` is empty, +* If setting `object_storage_cluster` is empty, * simple single-host variant is used, if setting not empty, cluster variant is used. -* `SELECT * FROM s3('s3://...', ...) SETTINGS object_storage_cluster_function_cluster='cluster'` +* `SELECT * FROM s3('s3://...', ...) SETTINGS object_storage_cluster='cluster'` * is equal to * `SELECT * FROM s3Cluster('cluster', 's3://...', ...)` */ diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 828ee0774d84..69ed202cbe4f 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -129,7 +129,7 @@ def test_select_all(started_cluster): SELECT * from s3( 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + SETTINGS object_storage_cluster = 'cluster_simple'""" ) assert TSV(pure_s3) == TSV(s3_distributed) @@ -160,7 +160,7 @@ def test_count(started_cluster): 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + SETTINGS object_storage_cluster = 'cluster_simple'""" ) assert TSV(pure_s3) == TSV(s3_distributed) @@ -192,7 +192,7 @@ def test_count_macro(started_cluster): 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + SETTINGS object_storage_cluster = 'cluster_simple'""" ) assert TSV(s3_macro) == TSV(s3_distributed) @@ -250,7 +250,7 @@ def test_union_all(started_cluster): 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ) ORDER BY (name, value, polygon) - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple' + SETTINGS object_storage_cluster = 'cluster_simple' """ ) @@ -285,7 +285,7 @@ def test_wrong_cluster(started_cluster): SELECT count(*) from s3( 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') - SETTINGS object_storage_cluster_function_cluster = 'non_existing_cluster' + SETTINGS object_storage_cluster = 'non_existing_cluster' """ ) @@ -329,7 +329,7 @@ def test_skip_unavailable_shards(started_cluster): SELECT count(*) from s3( 'http://minio1:9001/root/data/clickhouse/part1.csv', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') - SETTINGS skip_unavailable_shards = 1, object_storage_cluster_function_cluster = 'cluster_non_existent_port' + SETTINGS skip_unavailable_shards = 1, object_storage_cluster = 'cluster_non_existent_port' """ ) @@ -355,7 +355,7 @@ def test_unset_skip_unavailable_shards(started_cluster): SELECT count(*) from s3( 'http://minio1:9001/root/data/clickhouse/part1.csv', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') - SETTINGS object_storage_cluster_function_cluster = 'cluster_non_existent_port' + SETTINGS object_storage_cluster = 'cluster_non_existent_port' """ ) @@ -495,14 +495,14 @@ def test_cluster_with_header(started_cluster): assert ( node.query( """SELECT * from s3('http://resolver:8080/bucket/key.csv', headers(MyCustomHeader = 'SomeValue')) - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + SETTINGS object_storage_cluster = 'cluster_simple'""" ) == "SomeValue\n" ) assert ( node.query( """SELECT * from s3('http://resolver:8080/bucket/key.csv', headers(MyCustomHeader = 'SomeValue'), 'CSV') - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + SETTINGS object_storage_cluster = 'cluster_simple'""" ) == "SomeValue\n" ) @@ -527,14 +527,14 @@ def test_cluster_with_named_collection(started_cluster): s3_cluster = node.query( """SELECT * from s3(test_s3) ORDER BY (c1, c2, c3) - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + SETTINGS object_storage_cluster = 'cluster_simple'""" ) assert TSV(pure_s3) == TSV(s3_cluster) s3_cluster = node.query( """SELECT * from s3(test_s3, structure='auto') ORDER BY (c1, c2, c3) - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + SETTINGS object_storage_cluster = 'cluster_simple'""" ) assert TSV(pure_s3) == TSV(s3_cluster) @@ -571,14 +571,14 @@ def test_cluster_format_detection(started_cluster): result = node.query( """SELECT * FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123') order by c1, c2 - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + SETTINGS object_storage_cluster = 'cluster_simple'""" ) assert result == expected_result result = node.query( """SELECT * FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123', auto, 'a String, b UInt64') order by a, b - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + SETTINGS object_storage_cluster = 'cluster_simple'""" ) assert result == expected_result @@ -633,35 +633,35 @@ def test_cluster_default_expression(started_cluster): result = node.query( """SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'JSONEachRow', 'id UInt32, date Date DEFAULT 18262') order by id - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + SETTINGS object_storage_cluster = 'cluster_simple'""" ) assert result == expected_result result = node.query( """SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'auto', 'id UInt32, date Date DEFAULT 18262') order by id - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + SETTINGS object_storage_cluster = 'cluster_simple'""" ) assert result == expected_result result = node.query( """SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'JSONEachRow', 'id UInt32, date Date DEFAULT 18262', 'auto') order by id - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + SETTINGS object_storage_cluster = 'cluster_simple'""" ) assert result == expected_result result = node.query( """SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'auto', 'id UInt32, date Date DEFAULT 18262', 'auto') order by id - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + SETTINGS object_storage_cluster = 'cluster_simple'""" ) assert result == expected_result result = node.query( """SELECT * FROM s3(test_s3_with_default) order by id - SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'""" + SETTINGS object_storage_cluster = 'cluster_simple'""" ) assert result == expected_result diff --git a/tests/integration/test_storage_azure_blob_storage/test_cluster.py b/tests/integration/test_storage_azure_blob_storage/test_cluster.py index ea6b9c512952..54e1ced79577 100644 --- a/tests/integration/test_storage_azure_blob_storage/test_cluster.py +++ b/tests/integration/test_storage_azure_blob_storage/test_cluster.py @@ -100,7 +100,7 @@ def test_select_all(cluster): f"SELECT * from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1'," f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV'," f"'auto') " - f"SETTINGS object_storage_cluster_function_cluster='simple_cluster'", + f"SETTINGS object_storage_cluster='simple_cluster'", query_id=query_id_distributed_alt_syntax, ) print(distributed_azure_alt_syntax) @@ -169,7 +169,7 @@ def test_count(cluster): f"SELECT count(*) from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_count.csv', " f"'devstoreaccount1','Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV'," f"'auto', 'key UInt64')" - f"SETTINGS object_storage_cluster_function_cluster='simple_cluster'", + f"SETTINGS object_storage_cluster='simple_cluster'", ) print(distributed_azure_alt_syntax) assert TSV(pure_azure) == TSV(distributed_azure) diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index 632ca081c5ef..35e4aa131505 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -571,7 +571,7 @@ def test_hdfsCluster(started_cluster): query_id_cluster_alt_syntax = str(uuid.uuid4()) actual = node1.query( "select id, _file as file_name, _path as file_path from hdfs('hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') order by id", - settings={"object_storage_cluster_function_cluster":"test_cluster_two_shards"}, + settings={"object_storage_cluster":"test_cluster_two_shards"}, query_id=query_id_cluster_alt_syntax, ) assert actual == expected diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 23552aabaa77..dc7a4e28d7cb 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -654,7 +654,7 @@ def add_df(mode): instance.query( f""" SELECT * FROM {table_function_expr} - SETTINGS object_storage_cluster_function_cluster='cluster_simple' + SETTINGS object_storage_cluster='cluster_simple' """, query_id=query_id_cluster_alt_syntax, )