diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 93e141c8c883..137081856663 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -434,17 +434,18 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con configuration, configuration->createObjectStorage(context_copy, /* is_readonly */ false), StorageID(getDatabaseName(), name), - /* columns */columns, - /* constraints */ConstraintsDescription{}, - /* partition_by */nullptr, + /* columns */ columns, + /* constraints */ ConstraintsDescription{}, + /* partition_by */ nullptr, context_copy, - /* comment */"", + /* comment */ "", getFormatSettings(context_copy), LoadingStrictnessLevel::CREATE, getCatalog(), - /* if_not_exists*/true, - /* is_datalake_query*/true, - /* lazy_init */true); + /* if_not_exists */ true, + /* is_datalake_query */ true, + /* is_table_function */ true, + /* lazy_init */ true); } void DatabaseDataLake::dropTable( /// NOLINT diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 076ec5b5a28b..79eca5401a15 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -288,6 +288,8 @@ void IStorageCluster::read( return; } + updateConfigurationIfNeeded(context); + storage_snapshot->check(column_names); const auto & settings = context->getSettingsRef(); diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 362f938d120b..3f328d699a2d 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -97,6 +97,8 @@ class IStorageCluster : public IStorage throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName()); } + virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {} + private: static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts = 0); diff --git a/src/Storages/ObjectStorage/Local/Configuration.cpp b/src/Storages/ObjectStorage/Local/Configuration.cpp index 9ff67e836342..58eeac06b0e2 100644 --- a/src/Storages/ObjectStorage/Local/Configuration.cpp +++ b/src/Storages/ObjectStorage/Local/Configuration.cpp @@ -81,4 +81,20 @@ StorageObjectStorageQuerySettings StorageLocalConfiguration::getQuerySettings(co .ignore_non_existent_file = false}; } +ASTPtr StorageLocalConfiguration::createArgsWithAccessData() const +{ + auto arguments = std::make_shared(); + + arguments->children.push_back(std::make_shared(path.path)); + if (getFormat() != "auto") + arguments->children.push_back(std::make_shared(getFormat())); + if (getStructure() != "auto") + arguments->children.push_back(std::make_shared(getStructure())); + if (getCompressionMethod() != "auto") + arguments->children.push_back(std::make_shared(getCompressionMethod())); + + return arguments; +} + + } diff --git a/src/Storages/ObjectStorage/Local/Configuration.h b/src/Storages/ObjectStorage/Local/Configuration.h index 231e33f84d35..207d297147c1 100644 --- a/src/Storages/ObjectStorage/Local/Configuration.h +++ b/src/Storages/ObjectStorage/Local/Configuration.h @@ -60,6 +60,8 @@ class StorageLocalConfiguration : public StorageObjectStorageConfiguration void addStructureAndFormatToArgsIfNeeded(ASTs &, const String &, const String &, ContextPtr, bool) override { } + ASTPtr createArgsWithAccessData() const override; + private: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; void fromAST(ASTs & args, ContextPtr context, bool with_structure) override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index efae7b129d7a..c4f2f3797056 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -91,6 +91,7 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( std::shared_ptr catalog, bool if_not_exists, bool is_datalake_query, + bool is_table_function, bool lazy_init) : IStorageCluster( cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name))) @@ -145,6 +146,10 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( tryLogCurrentException(log_); } + // For tables need to update configuration on each read + // because data can be changed after previous update + update_configuration_on_read_write = !is_table_function; + ColumnsDescription columns{columns_in_table_or_function_definition}; std::string sample_path; if (need_resolve_columns_or_format) @@ -295,6 +300,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr {"IcebergS3", "icebergS3"}, {"IcebergAzure", "icebergAzure"}, {"IcebergHDFS", "icebergHDFS"}, + {"IcebergLocal", "icebergLocal"}, {"DeltaLake", "deltaLake"}, {"DeltaLakeS3", "deltaLakeS3"}, {"DeltaLakeAzure", "deltaLakeAzure"}, @@ -416,6 +422,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( {"icebergS3", "icebergS3Cluster"}, {"icebergAzure", "icebergAzureCluster"}, {"icebergHDFS", "icebergHDFSCluster"}, + {"icebergLocal", "icebergLocalCluster"}, {"deltaLake", "deltaLakeCluster"}, {"deltaLakeS3", "deltaLakeS3Cluster"}, {"deltaLakeAzure", "deltaLakeAzureCluster"}, @@ -741,6 +748,18 @@ IDataLakeMetadata * StorageObjectStorageCluster::getExternalMetadata(ContextPtr return configuration->getExternalMetadata(); } +void StorageObjectStorageCluster::updateConfigurationIfNeeded(ContextPtr context) +{ + if (update_configuration_on_read_write) + { + configuration->update( + object_storage, + context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */false); + } +} + void StorageObjectStorageCluster::checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const { if (getClusterName(context).empty()) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index f8b05846f09c..35ac5a4937e2 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -25,7 +25,8 @@ class StorageObjectStorageCluster : public IStorageCluster std::shared_ptr catalog, bool if_not_exists, bool is_datalake_query, - bool lazy_init = false); + bool is_table_function, + bool lazy_init); std::string getName() const override; @@ -154,6 +155,8 @@ class StorageObjectStorageCluster : public IStorageCluster ContextPtr context, bool async_insert) override; + void updateConfigurationIfNeeded(ContextPtr context) override; + /* In case the table was created with `object_storage_cluster` setting, modify the AST query object so that it uses the table function implementation @@ -176,6 +179,7 @@ class StorageObjectStorageCluster : public IStorageCluster /// non-clustered storage to fall back on pure realisation if needed std::shared_ptr pure_storage; + bool update_configuration_on_read_write; }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h b/src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h index 3163e517542b..ef00c15750ab 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h @@ -155,6 +155,13 @@ struct IcebergHDFSClusterDefinition static constexpr auto non_clustered_storage_engine_name = IcebergHDFSDefinition::storage_engine_name; }; +struct IcebergLocalClusterDefinition +{ + static constexpr auto name = "icebergLocalCluster"; + static constexpr auto storage_engine_name = "IcebergLocalCluster"; + static constexpr auto non_clustered_storage_engine_name = IcebergLocalDefinition::storage_engine_name; +}; + struct DeltaLakeClusterDefinition { static constexpr auto name = "deltaLakeCluster"; diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index bc1b96c7198b..cc031aad68a1 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -98,7 +98,9 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject args.mode, configuration->getCatalog(context, args.query.attach), args.query.if_not_exists, - /* is_datalake_query*/ false); + /* is_datalake_query */ false, + /* is_table_function */ false, + /* lazy_init */ false); } #endif diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index a7b8762a7e6b..145581a35989 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -204,9 +204,11 @@ StoragePtr TableFunctionObjectStorage:: /* comment */ String{}, /* format_settings */ std::nullopt, /// No format_settings /* mode */ LoadingStrictnessLevel::CREATE, - configuration->getCatalog(context, /*attach*/ false), + configuration->getCatalog(context, /* attach */ false), /* if_not_exists */ false, - /* is_datalake_query*/ false); + /* is_datalake_query */ false, + /* is_table_function */ true, + /* lazy_init */ false); storage->startup(); return storage; @@ -296,6 +298,7 @@ template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_AVRO && USE_AWS_S3 @@ -334,13 +337,4 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory) .allow_readonly = false}); } #endif - - -void registerDataLakeTableFunctions(TableFunctionFactory & factory) -{ - UNUSED(factory); -#if USE_AVRO - registerTableFunctionIceberg(factory); -#endif -} } diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index 5d5a6fa15134..b6567ce3d760 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -76,7 +76,8 @@ StoragePtr TableFunctionObjectStorageCluster( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on shared storage in parallel for many nodes in a specified cluster.)", + .examples{{IcebergLocalClusterDefinition::name, "SELECT * FROM icebergLocalCluster(cluster, filename, format, [,compression])", ""}}, + .category = FunctionDocumentation::Category::TableFunction}, + .allow_readonly = false}); + # if USE_AWS_S3 factory.registerFunction( {.documentation diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.h b/src/TableFunctions/TableFunctionObjectStorageCluster.h index 2529c36da62b..06044e480590 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.h +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.h @@ -62,6 +62,7 @@ using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster; +using TableFunctionIcebergLocalCluster = TableFunctionObjectStorageCluster; #endif #if USE_AVRO && USE_AWS_S3 diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp index c0fbea05a3f5..0ec8b178f6a3 100644 --- a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp @@ -67,6 +67,13 @@ struct IcebergHDFSClusterFallbackDefinition static constexpr auto storage_engine_cluster_name = "IcebergHDFSCluster"; }; +struct IcebergLocalClusterFallbackDefinition +{ + static constexpr auto name = "icebergLocal"; + static constexpr auto storage_engine_name = "Local"; + static constexpr auto storage_engine_cluster_name = "IcebergLocalCluster"; +}; + struct DeltaLakeClusterFallbackDefinition { static constexpr auto name = "deltaLake"; @@ -163,6 +170,7 @@ using TableFunctionHDFSClusterFallback = TableFunctionObjectStorageClusterFallba #if USE_AVRO using TableFunctionIcebergClusterFallback = TableFunctionObjectStorageClusterFallback; +using TableFunctionIcebergLocalClusterFallback = TableFunctionObjectStorageClusterFallback; #endif #if USE_AVRO && USE_AWS_S3 @@ -286,6 +294,27 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa .allow_readonly = false } ); + + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the Iceberg table stored on shared disk in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "icebergLocal", + "SELECT * FROM icebergLocal(filename)", "" + }, + { + "icebergLocal", + "SELECT * FROM icebergLocal(filename) " + "SETTINGS object_storage_cluster='cluster'", "" + }, + }, + .category = FunctionDocumentation::Category::TableFunction + }, + .allow_readonly = false + } + ); #endif #if USE_AVRO && USE_AWS_S3 diff --git a/src/TableFunctions/registerTableFunctions.cpp b/src/TableFunctions/registerTableFunctions.cpp index 2f2709ed3a93..c8497fbfdda2 100644 --- a/src/TableFunctions/registerTableFunctions.cpp +++ b/src/TableFunctions/registerTableFunctions.cpp @@ -69,7 +69,6 @@ void registerTableFunctions() registerTableFunctionObjectStorage(factory); registerTableFunctionObjectStorageCluster(factory); registerTableFunctionObjectStorageClusterFallback(factory); - registerDataLakeTableFunctions(factory); registerDataLakeClusterTableFunctions(factory); #if USE_YTSAURUS diff --git a/src/TableFunctions/registerTableFunctions.h b/src/TableFunctions/registerTableFunctions.h index c1dcb14568d7..84f0418bc4e1 100644 --- a/src/TableFunctions/registerTableFunctions.h +++ b/src/TableFunctions/registerTableFunctions.h @@ -70,7 +70,6 @@ void registerTableFunctionExplain(TableFunctionFactory & factory); void registerTableFunctionObjectStorage(TableFunctionFactory & factory); void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory); void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & factory); -void registerDataLakeTableFunctions(TableFunctionFactory & factory); void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory); void registerTableFunctionTimeSeries(TableFunctionFactory & factory); diff --git a/tests/integration/helpers/iceberg_utils.py b/tests/integration/helpers/iceberg_utils.py index 4cb0fd142dd3..9d0abb6fecea 100644 --- a/tests/integration/helpers/iceberg_utils.py +++ b/tests/integration/helpers/iceberg_utils.py @@ -288,22 +288,26 @@ def get_creation_expression( ) elif storage_type == "local": - assert not run_on_cluster - - if table_function: + if run_on_cluster: + assert table_function return f""" - iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format}) + iceberg{engine_part}Cluster('cluster_simple', {storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format}) """ else: - return ( - f""" - DROP TABLE IF EXISTS {table_name}; - CREATE TABLE {if_not_exists_prefix} {table_name} {schema} - ENGINE=Iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format}) - {partition_by} - {settings_expression} + if table_function: + return f""" + iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format}) """ - ) + else: + return ( + f""" + DROP TABLE IF EXISTS {table_name}; + CREATE TABLE {if_not_exists_prefix} {table_name} {schema} + ENGINE=Iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format}) + {partition_by} + {settings_expression} + """ + ) else: raise Exception(f"Unknown iceberg storage type: {storage_type}") @@ -473,6 +477,17 @@ def default_upload_directory( raise Exception(f"Unknown iceberg storage type: {storage_type}") +def additional_upload_directory( + started_cluster, node, storage_type, local_path, remote_path, **kwargs +): + if storage_type == "local": + return LocalUploader(started_cluster.instances[node]).upload_directory( + local_path, remote_path, **kwargs + ) + else: + raise Exception(f"Unknown iceberg storage type for additional uploading: {storage_type}") + + def default_download_directory( started_cluster, storage_type, remote_path, local_path, **kwargs ): @@ -485,7 +500,7 @@ def default_download_directory( def execute_spark_query_general( - spark, started_cluster, storage_type: str, table_name: str, query: str + spark, started_cluster, storage_type: str, table_name: str, query: str, additional_nodes=None ): spark.sql(query) default_upload_directory( @@ -494,8 +509,18 @@ def execute_spark_query_general( f"/iceberg_data/default/{table_name}/", f"/iceberg_data/default/{table_name}/", ) + additional_nodes = additional_nodes or [] + for node in additional_nodes: + additional_upload_directory( + started_cluster, + node, + storage_type, + f"/iceberg_data/default/{table_name}/", + f"/iceberg_data/default/{table_name}/", + ) return + def get_last_snapshot(path_to_table): import json import os diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 791234dece09..25d6f8f66f10 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -38,6 +38,7 @@ from helpers.iceberg_utils import ( default_upload_directory, + additional_upload_directory, default_download_directory, execute_spark_query_general, get_creation_expression, @@ -410,7 +411,7 @@ def count_secondary_subqueries(started_cluster, query_id, expected, comment): @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) def test_cluster_table_function(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] @@ -443,6 +444,19 @@ def add_df(mode): logging.info(f"Adding another dataframe. result files: {files}") + if storage_type == "local": + # For local storage we need to upload data from each node + for node_name, replica in started_cluster.instances.items(): + if node_name == "node1": + continue + additional_upload_directory( + started_cluster, + node_name, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + return files files = add_df(mode="overwrite") @@ -480,7 +494,7 @@ def make_query_from_function( storage_type_in_named_collection=storage_type_in_named_collection, ) query_id = str(uuid.uuid4()) - settings = "SETTINGS object_storage_cluster='cluster_simple'" if alt_syntax else "" + settings = f"SETTINGS object_storage_cluster='cluster_simple'" if (alt_syntax and not run_on_cluster) else "" if remote: query = f"SELECT * FROM remote('node2', {expr}) {settings}" else: @@ -1214,7 +1228,7 @@ def test_filesystem_cache(started_cluster, storage_type): @pytest.mark.parametrize( "storage_type, run_on_cluster", - [("s3", False), ("s3", True), ("azure", False), ("azure", True), ("local", False)], + [("s3", False), ("s3", True), ("azure", False), ("azure", True), ("local", False), ("local", True)], ) def test_partition_pruning(started_cluster, storage_type, run_on_cluster): instance = started_cluster.instances["node1"] @@ -1228,6 +1242,7 @@ def execute_spark_query(query: str): storage_type, TABLE_NAME, query, + additional_nodes=["node2", "node3"] if storage_type=="local" else [], ) execute_spark_query( @@ -2053,8 +2068,6 @@ def test_explicit_metadata_file(started_cluster, storage_type): @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) @pytest.mark.parametrize("run_on_cluster", [False, True]) def test_minmax_pruning_with_null(started_cluster, storage_type, run_on_cluster): - if run_on_cluster and storage_type == "local": - pytest.skip("Local storage is not supported on cluster") instance = started_cluster.instances["node1"] spark = started_cluster.spark_session TABLE_NAME = "test_minmax_pruning_with_null" + storage_type + "_" + get_uuid_str() @@ -2066,6 +2079,7 @@ def execute_spark_query(query: str): storage_type, TABLE_NAME, query, + additional_nodes=["node2", "node3"] if storage_type=="local" else [], ) execute_spark_query(