diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index ec447c0738ab..8a292d0c5778 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -72,7 +72,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) if (extension) return; - extension = storage->getTaskIteratorExtension(predicate, context, cluster); + extension = storage->getTaskIteratorExtension(predicate, filter_actions_dag, context, cluster); } /// The code executes on initiator diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index e66ba8222b8a..7f5d48faa2b8 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -43,6 +43,7 @@ class IStorageCluster : public IStorage /// Query is needed for pruning by virtual columns (_file, _path) virtual RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr cluster) const = 0; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 58143a0bbefd..f55e507ed55e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -362,12 +362,13 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & local_context, ClusterPtr cluster) const { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, - local_context, predicate, {}, getVirtualsList(), nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true); + local_context, predicate, filter_actions_dag, getVirtualsList(), nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true); std::vector ids_of_hosts; for (const auto & shard : cluster->getShardsInfo()) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index fd4270518976..ceb830abce45 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -31,6 +31,7 @@ class StorageObjectStorageCluster : public IStorageCluster RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr cluster) const override; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 3142a579ef4e..a7a3cc94ad06 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1164,7 +1164,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor const auto cluster = getCluster(); /// Select query is needed for pruining on virtual columns - auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster); + auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, filter, local_context, cluster); /// Here we take addresses from destination cluster and assume source table exists on these nodes size_t replica_index = 0; diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index d70e92fa3b45..0ee356c7b88e 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -79,6 +79,7 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & /* filter_actions_dag */, const ContextPtr & context, ClusterPtr) const { diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index 49d39a24ceba..93627fdc31d6 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -29,6 +29,7 @@ class StorageFileCluster : public IStorageCluster std::string getName() const override { return "FileCluster"; } RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr) const override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index e051594fa8e3..6b8353d25556 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6013,7 +6013,7 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu ContextMutablePtr query_context = Context::createCopy(local_context); query_context->increaseDistributedDepth(); - auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster); + auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, {}, local_context, src_cluster); size_t replica_index = 0; for (const auto & replicas : src_cluster->getShardsAddresses()) diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 7ba7f22c62e8..9253e6bec0c6 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -116,6 +116,7 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & /* filter_actions_dag */, const ContextPtr & context, ClusterPtr) const { diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index 9bfbaffe30f8..884ca16e7b12 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -32,6 +32,7 @@ class StorageURLCluster : public IStorageCluster std::string getName() const override { return "URLCluster"; } RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr) const override; diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 8b4a2008d9c0..676613232f14 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -580,7 +580,7 @@ def test_types(started_cluster, format_version, storage_type): [ ["a", "Nullable(Int32)"], ["b", "Nullable(String)"], - ["c", "Nullable(Date32)"], + ["c", "Nullable(Date)"], ["d", "Array(Nullable(String))"], ["e", "Nullable(Bool)"], ] @@ -603,7 +603,7 @@ def test_types(started_cluster, format_version, storage_type): [ ["a", "Nullable(Int32)"], ["b", "Nullable(String)"], - ["c", "Nullable(Date32)"], + ["c", "Nullable(Date)"], ["d", "Array(Nullable(String))"], ["e", "Nullable(Bool)"], ] @@ -2134,7 +2134,10 @@ def test_filesystem_cache(started_cluster, storage_type): @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) -def test_partition_pruning(started_cluster, storage_type): +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_partition_pruning(started_cluster, storage_type, run_on_cluster): + if run_on_cluster and storage_type == "local": + pytest.skip("Local storage is not supported on cluster") instance = started_cluster.instances["node1"] spark = started_cluster.spark_session TABLE_NAME = "test_partition_pruning_" + storage_type + "_" + get_uuid_str() @@ -2182,7 +2185,7 @@ def execute_spark_query(query: str): ) creation_expression = get_creation_expression( - storage_type, TABLE_NAME, started_cluster, table_function=True + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster ) def check_validity_and_get_prunned_files(select_expression): @@ -3078,7 +3081,10 @@ def test_explicit_metadata_file(started_cluster, storage_type): @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) -def test_minmax_pruning_with_null(started_cluster, storage_type): +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_minmax_pruning_with_null(started_cluster, storage_type, run_on_cluster): + if run_on_cluster and storage_type == "local": + pytest.skip("Local storage is not supported on cluster") instance = started_cluster.instances["node1"] spark = started_cluster.spark_session TABLE_NAME = "test_minmax_pruning_with_null" + storage_type + "_" + get_uuid_str() @@ -3149,7 +3155,7 @@ def execute_spark_query(query: str): ) creation_expression = get_creation_expression( - storage_type, TABLE_NAME, started_cluster, table_function=True + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster ) def check_validity_and_get_prunned_files(select_expression):