From cb6981979b6f87d83af21d236988643b19178100 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Fri, 18 Jul 2025 14:10:41 +0200 Subject: [PATCH 1/6] Merge pull request #912 from Altinity/backports/25.3.6/822 Iceberg as alias for DataLakeCatalog with catalog_type='rest' --- src/Databases/DataLake/DataLakeConstants.h | 1 + src/Databases/DataLake/DatabaseDataLake.cpp | 6 ++++ src/Parsers/ASTSetQuery.cpp | 3 +- .../integration/test_database_iceberg/test.py | 31 ++++++++++++------- 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/Databases/DataLake/DataLakeConstants.h b/src/Databases/DataLake/DataLakeConstants.h index eaa8f5a276e6..02f6a7dcfcd7 100644 --- a/src/Databases/DataLake/DataLakeConstants.h +++ b/src/Databases/DataLake/DataLakeConstants.h @@ -8,6 +8,7 @@ namespace DataLake { static constexpr auto DATABASE_ENGINE_NAME = "DataLakeCatalog"; +static constexpr auto DATABASE_ALIAS_NAME = "Iceberg"; static constexpr std::string_view FILE_PATH_PREFIX = "file:/"; /// Some catalogs (Unity or Glue) may store not only Iceberg/DeltaLake tables but other kinds of "tables" diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 7e647c0fd06e..1631dab31a2c 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -646,6 +646,11 @@ void registerDatabaseDataLake(DatabaseFactory & factory) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name); } + if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only"); + } + for (auto & engine_arg : engine_args) engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context); @@ -724,6 +729,7 @@ void registerDatabaseDataLake(DatabaseFactory & factory) std::move(engine_for_tables)); }; factory.registerDatabase("DataLakeCatalog", create_fn, { .supports_arguments = true, .supports_settings = true }); + factory.registerDatabase("Iceberg", create_fn, { .supports_arguments = true, .supports_settings = true }); } } diff --git a/src/Parsers/ASTSetQuery.cpp b/src/Parsers/ASTSetQuery.cpp index bfda0d6f9b83..a672551ab8d7 100644 --- a/src/Parsers/ASTSetQuery.cpp +++ b/src/Parsers/ASTSetQuery.cpp @@ -129,7 +129,8 @@ void ASTSetQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & format, return true; } - if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name) + if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name + || DataLake::DATABASE_ALIAS_NAME == state.create_engine_name) { if (DataLake::SETTINGS_TO_HIDE.contains(change.name)) { diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 10800a99cb1b..88b5e327293d 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -72,6 +72,8 @@ DEFAULT_SORT_ORDER = SortOrder(SortField(source_id=2, transform=IdentityTransform())) +AVAILABLE_ENGINES = ["DataLakeCatalog", "Iceberg"] + def list_namespaces(): response = requests.get(f"{BASE_URL_LOCAL}/namespaces") @@ -122,7 +124,7 @@ def generate_record(): def create_clickhouse_iceberg_database( - started_cluster, node, name, additional_settings={} + started_cluster, node, name, additional_settings={}, engine='DataLakeCatalog' ): settings = { "catalog_type": "rest", @@ -136,7 +138,7 @@ def create_clickhouse_iceberg_database( f""" DROP DATABASE IF EXISTS {name}; SET allow_experimental_database_iceberg=true; -CREATE DATABASE {name} ENGINE = DataLakeCatalog('{BASE_URL}', 'minio', '{minio_secret_key}') +CREATE DATABASE {name} ENGINE = {engine}('{BASE_URL}', 'minio', '{minio_secret_key}') SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))} """ ) @@ -169,7 +171,8 @@ def started_cluster(): cluster.shutdown() -def test_list_tables(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_list_tables(started_cluster, engine): node = started_cluster.instances["node1"] root_namespace = f"clickhouse_{uuid.uuid4()}" @@ -200,7 +203,7 @@ def test_list_tables(started_cluster): for namespace in [namespace_1, namespace_2]: assert len(catalog.list_tables(namespace)) == 0 - create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME, engine=engine) tables_list = "" for table in namespace_1_tables: @@ -235,7 +238,8 @@ def test_list_tables(started_cluster): ) -def test_many_namespaces(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_many_namespaces(started_cluster, engine): node = started_cluster.instances["node1"] root_namespace_1 = f"A_{uuid.uuid4()}" root_namespace_2 = f"B_{uuid.uuid4()}" @@ -256,7 +260,7 @@ def test_many_namespaces(started_cluster): for table in tables: create_table(catalog, namespace, table) - create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME, engine=engine) for namespace in namespaces: for table in tables: @@ -268,7 +272,8 @@ def test_many_namespaces(started_cluster): ) -def test_select(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_select(started_cluster, engine): node = started_cluster.instances["node1"] test_ref = f"test_list_tables_{uuid.uuid4()}" @@ -296,7 +301,7 @@ def test_select(started_cluster): df = pa.Table.from_pylist(data) table.append(df) - create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME, engine=engine) expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace, table_name) assert expected == node.query( @@ -310,7 +315,8 @@ def test_select(started_cluster): assert int(node.query(f"SELECT count() FROM system.iceberg_history WHERE table = '{namespace}.{table_name}' and database = '{CATALOG_NAME}'").strip()) == 1 -def test_hide_sensitive_info(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_hide_sensitive_info(started_cluster, engine): node = started_cluster.instances["node1"] test_ref = f"test_hide_sensitive_info_{uuid.uuid4()}" @@ -328,6 +334,7 @@ def test_hide_sensitive_info(started_cluster): node, CATALOG_NAME, additional_settings={"catalog_credential": "SECRET_1"}, + engine=engine, ) assert "SECRET_1" not in node.query(f"SHOW CREATE DATABASE {CATALOG_NAME}") @@ -336,11 +343,13 @@ def test_hide_sensitive_info(started_cluster): node, CATALOG_NAME, additional_settings={"auth_header": "SECRET_2"}, + engine=engine, ) assert "SECRET_2" not in node.query(f"SHOW CREATE DATABASE {CATALOG_NAME}") -def test_tables_with_same_location(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_tables_with_same_location(started_cluster, engine): node = started_cluster.instances["node1"] test_ref = f"test_tables_with_same_location_{uuid.uuid4()}" @@ -371,7 +380,7 @@ def record(key): df = pa.Table.from_pylist(data) table_2.append(df) - create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME, engine=engine) assert 'aaa\naaa\naaa' == node.query(f"SELECT symbol FROM {CATALOG_NAME}.`{namespace}.{table_name}`").strip() assert 'bbb\nbbb\nbbb' == node.query(f"SELECT symbol FROM {CATALOG_NAME}.`{namespace}.{table_name_2}`").strip() From f900c066d51a4215475a990f02bad76e6306bb91 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 9 Sep 2025 11:04:56 +0200 Subject: [PATCH 2/6] Send iceberg_metadata_file_path setting to swarm node --- .../DataLakes/DataLakeConfiguration.h | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 221868c0b63c..032d0992b990 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -40,6 +41,7 @@ namespace ErrorCodes namespace DataLakeStorageSetting { extern DataLakeStorageSettingsBool allow_dynamic_metadata_for_data_lakes; + extern const DataLakeStorageSettingsString iceberg_metadata_file_path; } @@ -173,6 +175,42 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl current_metadata->modifyFormatSettings(settings_); } + ASTPtr createArgsWithAccessData() const override + { + auto res = BaseStorageConfiguration::createArgsWithAccessData(); + + auto iceberg_metadata_file_path = (*settings)[DataLakeStorageSetting::iceberg_metadata_file_path]; + + if (iceberg_metadata_file_path.changed) + { + auto * arguments = res->template as(); + if (!arguments) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not an expression list"); + + bool has_settings = false; + + for (auto & arg : arguments->children) + { + if (auto * settings_ast = arg->template as()) + { + has_settings = true; + settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value); + break; + } + } + + if (!has_settings) + { + std::shared_ptr settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value); + arguments->children.push_back(settings_ast); + } + } + + return res; + } + private: DataLakeMetadataPtr current_metadata; LoggerPtr log = getLogger("DataLakeConfiguration"); From 8f133538db9f420a193abd12f58e80502c5fd7e1 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Wed, 14 May 2025 09:52:45 +0200 Subject: [PATCH 3/6] Merge pull request #756 from Altinity/feature/object_storage_remote_initiator Setting object_storage_remote_initiator --- src/Core/Settings.cpp | 4 + src/Core/SettingsChangesHistory.cpp | 2 + src/IO/ReadBufferFromS3.cpp | 6 ++ src/Storages/IStorageCluster.cpp | 82 ++++++++++++++++++- src/Storages/IStorageCluster.h | 13 ++- .../DataLakes/DataLakeConfiguration.h | 15 ++++ .../DataLakes/DeltaLakeMetadata.h | 2 + .../DeltaLakeMetadataDeltaKernel.cpp | 5 ++ .../DataLakes/DeltaLakeMetadataDeltaKernel.h | 2 + .../ObjectStorage/DataLakes/HudiMetadata.cpp | 6 +- .../ObjectStorage/DataLakes/HudiMetadata.h | 3 +- .../DataLakes/IDataLakeMetadata.h | 3 + .../DataLakes/Iceberg/IcebergMetadata.cpp | 7 +- .../DataLakes/Iceberg/IcebergMetadata.h | 9 +- .../ObjectStorage/StorageObjectStorage.cpp | 10 ++- .../ObjectStorage/StorageObjectStorage.h | 4 +- .../StorageObjectStorageCluster.cpp | 3 +- .../extractTableFunctionFromSelectQuery.cpp | 18 +++- .../extractTableFunctionFromSelectQuery.h | 3 + src/TableFunctions/TableFunctionRemote.h | 2 + 20 files changed, 179 insertions(+), 20 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 5070f4fbdfe6..26fb7048d93e 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6880,6 +6880,9 @@ Possible values: - '' - do not force any kind of Exchange operators, let the optimizer choose, - 'Persisted' - use temporary files in object storage, - 'Streaming' - stream exchange data over network. +)", EXPERIMENTAL) \ + DECLARE(Bool, object_storage_remote_initiator, false, R"( +Execute request to object storage as remote on one of object_storage_cluster nodes. )", EXPERIMENTAL) \ \ /** Experimental timeSeries* aggregate functions. */ \ @@ -6887,6 +6890,7 @@ Possible values: Experimental timeSeries* aggregate functions for Prometheus-like timeseries resampling, rate, delta calculation. )", EXPERIMENTAL, allow_experimental_ts_to_grid_aggregate_function) \ \ + /* ####################################################### */ \ /* ############ END OF EXPERIMENTAL FEATURES ############# */ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index ebc3d26dcf08..8ddf627f3eaa 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -166,6 +166,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"max_os_cpu_wait_time_ratio_to_throw", 0, 0, "New setting"}, {"query_plan_merge_filter_into_join_condition", false, true, "Added new setting to merge filter into join condition"}, {"use_local_cache_for_remote_storage", true, false, "Obsolete setting."}, + {"object_storage_remote_initiator", false, false, "New setting."}, + {"use_iceberg_metadata_files_cache", true, true, "New setting"}, {"iceberg_timestamp_ms", 0, 0, "New setting."}, {"iceberg_snapshot_id", 0, 0, "New setting."}, {"use_iceberg_metadata_files_cache", true, true, "New setting"}, diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index 1a27e98c4ae1..bdbe26342678 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -444,6 +444,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}", bucket, key, version_id.empty() ? "Latest" : version_id, range_begin); } + else + { + LOG_TEST( + log, "Read S3 object. Bucket: {}, Key: {}, Version: {}", + bucket, key, version_id.empty() ? "Latest" : version_id); + } ProfileEvents::increment(ProfileEvents::S3GetObject); if (client_ptr->isClientForDisk()) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 866964b4c7dc..25e2d9139c0f 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -1,5 +1,8 @@ #include +#include +#include + #include #include #include @@ -13,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -23,6 +27,9 @@ #include #include #include +#include +#include +#include #include #include @@ -41,6 +48,7 @@ namespace Setting extern const SettingsString cluster_for_parallel_replicas; extern const SettingsNonZeroUInt64 max_parallel_replicas; extern const SettingsUInt64 object_storage_max_nodes; + extern const SettingsBool object_storage_remote_initiator; } namespace ErrorCodes @@ -143,15 +151,16 @@ void IStorageCluster::read( storage_snapshot->check(column_names); - updateBeforeRead(context); - auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]); + const auto & settings = context->getSettingsRef(); + + auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) Block sample_block; ASTPtr query_to_send = query_info.query; - if (context->getSettingsRef()[Setting::allow_experimental_analyzer]) + if (settings[Setting::allow_experimental_analyzer]) { sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage)); } @@ -164,6 +173,17 @@ void IStorageCluster::read( updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context); + if (settings[Setting::object_storage_remote_initiator]) + { + auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send); + auto src_distributed = std::dynamic_pointer_cast(storage_and_context.storage); + auto modified_query_info = query_info; + modified_query_info.cluster = src_distributed->getCluster(); + auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context); + storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams); + return; + } + RestoreQualifiedNamesVisitor::Data data; data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as(), 0)); data.remote_table.database = context->getCurrentDatabase(); @@ -191,6 +211,62 @@ void IStorageCluster::read( query_plan.addStep(std::move(reading)); } +IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote( + ClusterPtr cluster, + ContextPtr context, + const std::string & cluster_name_from_settings, + ASTPtr query_to_send) +{ + auto host_addresses = cluster->getShardsAddresses(); + if (host_addresses.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings); + + static pcg64 rng(randomSeed()); + size_t shard_num = rng() % host_addresses.size(); + auto shard_addresses = host_addresses[shard_num]; + /// After getClusterImpl each shard must have exactly 1 replica + if (shard_addresses.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings); + auto host_name = shard_addresses[0].toString(); + + LOG_INFO(log, "Choose remote initiator '{}'", host_name); + + bool secure = shard_addresses[0].secure == Protocol::Secure::Enable; + std::string remote_function_name = secure ? "remoteSecure" : "remote"; + + /// Clean object_storage_remote_initiator setting to avoid infinite remote call + auto new_context = Context::createCopy(context); + new_context->setSetting("object_storage_remote_initiator", false); + + auto * select_query = query_to_send->as(); + if (!select_query) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query"); + + auto query_settings = select_query->settings(); + if (query_settings) + { + auto & settings_ast = query_settings->as(); + if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty()) + { + select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {}); + } + } + + ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send); + if (!table_expression) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression"); + + auto remote_query = makeASTFunction(remote_function_name, std::make_shared(host_name), table_expression->table_function); + + table_expression->table_function = remote_query; + + auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context); + + auto storage = remote_function->execute(query_to_send, new_context, remote_function_name); + + return RemoteCallVariables{storage, new_context}; +} + SinkToStoragePtr IStorageCluster::write( const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 5c7c43c61e1a..2dc324a08016 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -58,9 +58,20 @@ class IStorageCluster : public IStorage virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); } protected: - virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} + struct RemoteCallVariables + { + StoragePtr storage; + ContextPtr context; + }; + + RemoteCallVariables convertToRemote( + ClusterPtr cluster, + ContextPtr context, + const std::string & cluster_name_from_settings, + ASTPtr query_to_send); + virtual void readFallBackToPure( QueryPlan & /* query_plan */, const Names & /* column_names */, diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 032d0992b990..4768936e3627 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -98,6 +98,16 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl return std::nullopt; } + std::optional tryGetSamplePathFromMetadata() const override + { + if (!current_metadata) + return std::nullopt; + auto data_files = current_metadata->getDataFiles(); + if (!data_files.empty()) + return data_files[0]; + return std::nullopt; + } + std::optional totalRows(ContextPtr local_context) override { assertInitializedDL(); @@ -525,6 +535,11 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, void assertInitialized() const override { getImpl().assertInitialized(); } + std::optional tryGetSamplePathFromMetadata() const override + { + return getImpl().tryGetSamplePathFromMetadata(); + } + private: inline StorageObjectStorage::Configuration & getImpl() const { diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h index e68a673c740f..a3cf16cd6673 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h @@ -35,6 +35,8 @@ class DeltaLakeMetadata final : public IDataLakeMetadata DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_); + Strings getDataFiles() const override { return data_files; } + NamesAndTypesList getTableSchema() const override { return schema; } DeltaLakePartitionColumns getPartitionColumns() const { return partition_columns; } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp index a00568642dc3..e54bc89e3a39 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp @@ -35,6 +35,11 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &) return table_snapshot->update(); } +Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const +{ + throwNotImplemented("getDataFiles()"); +} + ObjectIterator DeltaLakeMetadataDeltaKernel::iterate( const ActionsDAG * filter_dag, FileProgressCallback callback, diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h index 679214d5c489..cfc57b791040 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h @@ -33,6 +33,8 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata bool update(const ContextPtr & context) override; + Strings getDataFiles() const override; + NamesAndTypesList getTableSchema() const override; DB::ReadFromFormatInfo prepareReadingFromFormat( diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp index 774e39554edc..6398ad34a4b6 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp @@ -91,7 +91,7 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserv { } -Strings HudiMetadata::getDataFiles(const ActionsDAG *) const +Strings HudiMetadata::getDataFiles() const { if (data_files.empty()) data_files = getDataFilesImpl(); @@ -99,12 +99,12 @@ Strings HudiMetadata::getDataFiles(const ActionsDAG *) const } ObjectIterator HudiMetadata::iterate( - const ActionsDAG * filter_dag, + const ActionsDAG * /* filter_dag */, FileProgressCallback callback, size_t /* list_batch_size */, ContextPtr /* context */) const { - return createKeysIterator(getDataFiles(filter_dag), object_storage, callback); + return createKeysIterator(getDataFiles(), object_storage, callback); } } diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h index 7fd94e0d14c4..2c23269b928a 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h @@ -19,6 +19,8 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_); + Strings getDataFiles() const override; + NamesAndTypesList getTableSchema() const override { return {}; } bool operator ==(const IDataLakeMetadata & other) const override @@ -50,7 +52,6 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext mutable Strings data_files; Strings getDataFilesImpl() const; - Strings getDataFiles(const ActionsDAG * filter_dag) const; }; } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 39f66faf106f..049a1fa8d60a 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -21,6 +21,9 @@ class IDataLakeMetadata : boost::noncopyable virtual bool operator==(const IDataLakeMetadata & other) const = 0; + /// List all data files. + /// For better parallelization, iterate() method should be used. + virtual Strings getDataFiles() const = 0; /// Return iterator to `data files`. using FileProgressCallback = std::function; virtual ObjectIterator iterate( diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index c2b8caa64c55..5b00717f9107 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -144,7 +144,8 @@ IcebergMetadata::IcebergMetadata( Int32 format_version_, const Poco::JSON::Object::Ptr & metadata_object_, IcebergMetadataFilesCachePtr cache_ptr) - : object_storage(std::move(object_storage_)) + : WithContext(context_) + , object_storage(std::move(object_storage_)) , configuration(std::move(configuration_)) , schema_processor(IcebergSchemaProcessor()) , log(getLogger("IcebergMetadata")) @@ -858,7 +859,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const return create_fn(); } -Strings IcebergMetadata::getDataFiles(const ActionsDAG * filter_dag, ContextPtr local_context) const +Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag, ContextPtr local_context) const { bool use_partition_pruning = filter_dag && local_context->getSettingsRef()[Setting::use_iceberg_partition_pruning]; @@ -980,7 +981,7 @@ ObjectIterator IcebergMetadata::iterate( ContextPtr local_context) const { SharedLockGuard lock(mutex); - return createKeysIterator(getDataFiles(filter_dag, local_context), object_storage, callback); + return createKeysIterator(getDataFilesImpl(filter_dag, local_context), object_storage, callback); } NamesAndTypesList IcebergMetadata::getTableSchema() const diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 411ffa296d17..0a6d16112625 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -27,7 +27,7 @@ namespace DB { -class IcebergMetadata : public IDataLakeMetadata +class IcebergMetadata : public IDataLakeMetadata, private WithContext { public: using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr; @@ -45,6 +45,11 @@ class IcebergMetadata : public IDataLakeMetadata const Poco::JSON::Object::Ptr & metadata_object, IcebergMetadataFilesCachePtr cache_ptr); + /// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files. + /// All subsequent calls when the same data snapshot is relevant will return saved list of files (because it cannot be changed + /// without changing metadata file). Drops on every snapshot update. + Strings getDataFiles() const override { return getDataFilesImpl(nullptr, getContext()); } + /// Get table schema parsed from metadata. NamesAndTypesList getTableSchema() const override; @@ -111,7 +116,7 @@ class IcebergMetadata : public IDataLakeMetadata mutable std::mutex cached_unprunned_files_for_last_processed_snapshot_mutex; void updateState(const ContextPtr & local_context, Poco::JSON::Object::Ptr metadata_object, bool metadata_file_changed) TSA_REQUIRES(mutex); - Strings getDataFiles(const ActionsDAG * filter_dag, ContextPtr local_context) const; + Strings getDataFilesImpl(const ActionsDAG * filter_dag, ContextPtr local_context) const; void updateSnapshot(ContextPtr local_context, Poco::JSON::Object::Ptr metadata_object) TSA_REQUIRES(mutex); ManifestFileCacheKeys getManifestList(ContextPtr local_context, const String & filename) const; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 9ee0ad779f27..bd46d8412649 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -98,7 +98,8 @@ StorageObjectStorage::StorageObjectStorage( bool distributed_processing_, ASTPtr partition_by_, bool is_table_function, - bool lazy_init) + bool lazy_init, + std::optional sample_path_) : IStorage(table_id_) , configuration(configuration_) , object_storage(object_storage_) @@ -145,7 +146,7 @@ StorageObjectStorage::StorageObjectStorage( /// (e.g. read always follows constructor immediately). update_configuration_on_read_write = !is_table_function || !updated_configuration; - std::string sample_path; + std::string sample_path = sample_path_.value_or(""); ColumnsDescription columns{columns_in_table_or_function_definition}; if (need_resolve_columns_or_format) @@ -315,6 +316,11 @@ std::optional StorageObjectStorage::Configuration::tryGetTab throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetTableStructureFromMetadata is not implemented for basic configuration"); } +std::optional StorageObjectStorage::Configuration::tryGetSamplePathFromMetadata() const +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetSamplePathFromMetadata is not implemented for basic configuration"); +} + void StorageObjectStorage::read( QueryPlan & query_plan, const Names & column_names, diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 55a6eaf4f451..721cab5b8c6a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -78,7 +78,8 @@ class StorageObjectStorage : public IStorage bool distributed_processing_ = false, ASTPtr partition_by_ = nullptr, bool is_table_function_ = false, - bool lazy_init = false); + bool lazy_init = false, + std::optional sample_path_ = std::nullopt); String getName() const override; @@ -288,6 +289,7 @@ class StorageObjectStorage::Configuration virtual void initPartitionStrategy(ASTPtr partition_by, const ColumnsDescription & columns, ContextPtr context); virtual std::optional tryGetTableStructureFromMetadata() const; + virtual std::optional tryGetSamplePathFromMetadata() const; virtual bool supportsFileIterator() const { return false; } virtual bool supportsWrites() const { return true; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 957a0715c8b6..3016e7b014a6 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -166,7 +166,8 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( /* distributed_processing */false, partition_by, /* is_table_function */false, - /* lazy_init */lazy_init); + /* lazy_init */lazy_init, + sample_path); auto virtuals_ = getVirtualsPtr(); if (virtuals_) diff --git a/src/Storages/extractTableFunctionFromSelectQuery.cpp b/src/Storages/extractTableFunctionFromSelectQuery.cpp index c7f60240b3c7..2f457dadee3b 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionFromSelectQuery.cpp @@ -9,7 +9,7 @@ namespace DB { -ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query) { auto * select_query = query->as(); if (!select_query || !select_query->tables()) @@ -17,10 +17,22 @@ ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) auto * tables = select_query->tables()->as(); auto * table_expression = tables->children[0]->as()->table_expression->as(); - if (!table_expression->table_function) + return table_expression; +} + +ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query) +{ + auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query); + return table_expression ? table_expression->table_function : nullptr; +} + +ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +{ + auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query); + if (!table_function_ast) return nullptr; - auto * table_function = table_expression->table_function->as(); + auto * table_function = table_function_ast->as(); return table_function; } diff --git a/src/Storages/extractTableFunctionFromSelectQuery.h b/src/Storages/extractTableFunctionFromSelectQuery.h index 87edf01c1c82..9834f3dc7573 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.h +++ b/src/Storages/extractTableFunctionFromSelectQuery.h @@ -6,7 +6,10 @@ namespace DB { +struct ASTTableExpression; +ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query); +ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query); ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); diff --git a/src/TableFunctions/TableFunctionRemote.h b/src/TableFunctions/TableFunctionRemote.h index 0f75bf2b854c..4de60a79aea3 100644 --- a/src/TableFunctions/TableFunctionRemote.h +++ b/src/TableFunctions/TableFunctionRemote.h @@ -26,6 +26,8 @@ class TableFunctionRemote : public ITableFunction bool needStructureConversion() const override { return false; } + void setRemoteTableFunction(ASTPtr remote_table_function_ptr_) { remote_table_function_ptr = remote_table_function_ptr_; } + private: StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; From 146218a195846c85f8df69b91c763d70764f2115 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 9 Sep 2025 12:09:16 +0200 Subject: [PATCH 4/6] Fix hide password for Iceberg database --- src/Parsers/FunctionSecretArgumentsFinder.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Parsers/FunctionSecretArgumentsFinder.h b/src/Parsers/FunctionSecretArgumentsFinder.h index f145760a3a26..9ab6689637d0 100644 --- a/src/Parsers/FunctionSecretArgumentsFinder.h +++ b/src/Parsers/FunctionSecretArgumentsFinder.h @@ -716,7 +716,7 @@ class FunctionSecretArgumentsFinder /// S3('url', 'access_key_id', 'secret_access_key') findS3DatabaseSecretArguments(); } - else if (engine_name == "DataLakeCatalog") + else if (engine_name == "DataLakeCatalog" || engine_name == "Iceberg") { findDataLakeCatalogSecretArguments(); } From c580ca98c6c83b71ed14197841bfafced438685f Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 6 May 2025 13:37:47 +0200 Subject: [PATCH 5/6] Prune table in icebergCluster functions --- src/Storages/IStorageCluster.cpp | 2 +- src/Storages/IStorageCluster.h | 1 + .../ObjectStorage/StorageObjectStorageCluster.cpp | 3 ++- .../ObjectStorage/StorageObjectStorageCluster.h | 1 + src/Storages/StorageDistributed.cpp | 2 +- src/Storages/StorageFileCluster.cpp | 1 + src/Storages/StorageFileCluster.h | 1 + src/Storages/StorageReplicatedMergeTree.cpp | 2 +- src/Storages/StorageURLCluster.cpp | 1 + src/Storages/StorageURLCluster.h | 1 + tests/integration/test_storage_iceberg/test.py | 14 ++++++++++---- 11 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 25e2d9139c0f..3a0e05ab0911 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -127,7 +127,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) if (extension) return; - extension = storage->getTaskIteratorExtension(predicate, context, cluster); + extension = storage->getTaskIteratorExtension(predicate, filter_actions_dag, context, cluster); } /// The code executes on initiator diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 2dc324a08016..6c15e7ff370b 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -44,6 +44,7 @@ class IStorageCluster : public IStorage /// Query is needed for pruning by virtual columns (_file, _path) virtual RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr cluster) const = 0; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 3016e7b014a6..4de061a6705d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -441,12 +441,13 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & local_context, ClusterPtr cluster) const { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, - local_context, predicate, {}, virtual_columns, hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true); + local_context, predicate, filter_actions_dag, virtual_columns, hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true); std::vector ids_of_hosts; for (const auto & shard : cluster->getShardsInfo()) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 4b9b2239b7d2..8dab48dbaa59 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -30,6 +30,7 @@ class StorageObjectStorageCluster : public IStorageCluster RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr cluster) const override; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 0ae57212024d..c5a89813f79c 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1317,7 +1317,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor const auto cluster = getCluster(); /// Select query is needed for pruining on virtual columns - auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster); + auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, filter, local_context, cluster); /// Here we take addresses from destination cluster and assume source table exists on these nodes size_t replica_index = 0; diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index 9fe7b164a7a7..de825df3a255 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -96,6 +96,7 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & /* filter_actions_dag */, const ContextPtr & context, ClusterPtr) const { diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index 5fb08a48eec6..354285602ec4 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -29,6 +29,7 @@ class StorageFileCluster : public IStorageCluster std::string getName() const override { return "FileCluster"; } RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr) const override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 9ec0231a8c71..565d0d3fb5f3 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6086,7 +6086,7 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu ContextMutablePtr query_context = Context::createCopy(local_context); query_context->increaseDistributedDepth(); - auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster); + auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, {}, local_context, src_cluster); size_t replica_index = 0; for (const auto & replicas : src_cluster->getShardsAddresses()) diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 13be67789df8..5570392477e8 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -128,6 +128,7 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & /* filter_actions_dag */, const ContextPtr & context, ClusterPtr) const { diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index e360eb22d701..2fefa108c965 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -32,6 +32,7 @@ class StorageURLCluster : public IStorageCluster std::string getName() const override { return "URLCluster"; } RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr) const override; diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index b131ce8aa36e..5354f0958aa2 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -2235,7 +2235,10 @@ def check_validity_and_get_prunned_files_general(instance, table_name, settings1 @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) -def test_partition_pruning(started_cluster, storage_type): +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_partition_pruning(started_cluster, storage_type, run_on_cluster): + if run_on_cluster and storage_type == "local": + pytest.skip("Local storage is not supported on cluster") instance = started_cluster.instances["node1"] spark = started_cluster.spark_session TABLE_NAME = "test_partition_pruning_" + storage_type + "_" + get_uuid_str() @@ -2282,7 +2285,7 @@ def execute_spark_query(query: str): ) creation_expression = get_creation_expression( - storage_type, TABLE_NAME, started_cluster, table_function=True + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster ) def check_validity_and_get_prunned_files(select_expression): @@ -3088,7 +3091,10 @@ def test_explicit_metadata_file(started_cluster, storage_type): create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="../metadata/v11.metadata.json") @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) -def test_minmax_pruning_with_null(started_cluster, storage_type): +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_minmax_pruning_with_null(started_cluster, storage_type, run_on_cluster): + if run_on_cluster and storage_type == "local": + pytest.skip("Local storage is not supported on cluster") instance = started_cluster.instances["node1"] spark = started_cluster.spark_session TABLE_NAME = "test_minmax_pruning_with_null" + storage_type + "_" + get_uuid_str() @@ -3158,7 +3164,7 @@ def execute_spark_query(query: str): ) creation_expression = get_creation_expression( - storage_type, TABLE_NAME, started_cluster, table_function=True + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster ) def check_validity_and_get_prunned_files(select_expression): From 33858638333c23c604d0a60eb399297d5816a172 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 9 Sep 2025 19:16:26 +0200 Subject: [PATCH 6/6] Fix settings changes history --- src/Core/SettingsChangesHistory.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 8ddf627f3eaa..8849fb3666d2 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -76,6 +76,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, {"object_storage_cluster", "", "", "New setting"}, {"object_storage_max_nodes", 0, 0, "New setting"}, + {"object_storage_remote_initiator", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.6", { @@ -166,8 +167,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"max_os_cpu_wait_time_ratio_to_throw", 0, 0, "New setting"}, {"query_plan_merge_filter_into_join_condition", false, true, "Added new setting to merge filter into join condition"}, {"use_local_cache_for_remote_storage", true, false, "Obsolete setting."}, - {"object_storage_remote_initiator", false, false, "New setting."}, - {"use_iceberg_metadata_files_cache", true, true, "New setting"}, {"iceberg_timestamp_ms", 0, 0, "New setting."}, {"iceberg_snapshot_id", 0, 0, "New setting."}, {"use_iceberg_metadata_files_cache", true, true, "New setting"},