diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index f326537e7a4d..9f9bd9c722e2 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6135,8 +6135,12 @@ Possible values: /** Experimental tsToGrid aggregate function. */ \ DECLARE(Bool, allow_experimental_ts_to_grid_aggregate_function, false, R"( Experimental tsToGrid aggregate function for Prometheus-like timeseries resampling. Cloud only +)", EXPERIMENTAL) \ + DECLARE(Bool, object_storage_remote_initiator, false, R"( +Execute request to object storage as remote on one of object_storage_cluster nodes. )", EXPERIMENTAL) \ \ + /* ####################################################### */ \ /* ############ END OF EXPERIMENTAL FEATURES ############# */ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 3768b85208ab..9c9bee4158e5 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -87,6 +87,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"allow_experimental_database_unity_catalog", false, false, "Allow experimental database engine DataLakeCatalog with catalog_type = 'unity'"}, {"allow_experimental_database_glue_catalog", false, false, "Allow experimental database engine DataLakeCatalog with catalog_type = 'glue'"}, {"use_page_cache_with_distributed_cache", false, false, "New setting"}, + {"object_storage_remote_initiator", false, false, "New setting."}, {"use_iceberg_metadata_files_cache", true, true, "New setting"}, {"use_query_condition_cache", false, false, "New setting."}, {"iceberg_timestamp_ms", 0, 0, "New setting."}, diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index df2d5f089534..441f5d58a15c 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -431,6 +431,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}", bucket, key, version_id.empty() ? "Latest" : version_id, range_begin); } + else + { + LOG_TEST( + log, "Read S3 object. Bucket: {}, Key: {}, Version: {}", + bucket, key, version_id.empty() ? "Latest" : version_id); + } ProfileEvents::increment(ProfileEvents::S3GetObject); if (client_ptr->isClientForDisk()) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index ec447c0738ab..9c114940967d 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -1,5 +1,8 @@ #include +#include +#include + #include #include #include @@ -12,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -21,6 +25,9 @@ #include #include #include +#include +#include +#include #include #include @@ -39,6 +46,7 @@ namespace Setting extern const SettingsString cluster_for_parallel_replicas; extern const SettingsNonZeroUInt64 max_parallel_replicas; extern const SettingsUInt64 object_storage_max_nodes; + extern const SettingsBool object_storage_remote_initiator; } namespace ErrorCodes @@ -72,7 +80,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) if (extension) return; - extension = storage->getTaskIteratorExtension(predicate, context, cluster); + extension = storage->getTaskIteratorExtension(predicate, filter_actions_dag, context, cluster); } /// The code executes on initiator @@ -96,15 +104,16 @@ void IStorageCluster::read( storage_snapshot->check(column_names); - updateBeforeRead(context); - auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]); + const auto & settings = context->getSettingsRef(); + + auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) Block sample_block; ASTPtr query_to_send = query_info.query; - if (context->getSettingsRef()[Setting::allow_experimental_analyzer]) + if (settings[Setting::allow_experimental_analyzer]) { sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage)); } @@ -117,6 +126,17 @@ void IStorageCluster::read( updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context); + if (settings[Setting::object_storage_remote_initiator]) + { + auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send); + auto src_distributed = std::dynamic_pointer_cast(storage_and_context.storage); + auto modified_query_info = query_info; + modified_query_info.cluster = src_distributed->getCluster(); + auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context); + storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams); + return; + } + RestoreQualifiedNamesVisitor::Data data; data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as(), 0)); data.remote_table.database = context->getCurrentDatabase(); @@ -144,6 +164,62 @@ void IStorageCluster::read( query_plan.addStep(std::move(reading)); } +IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote( + ClusterPtr cluster, + ContextPtr context, + const std::string & cluster_name_from_settings, + ASTPtr query_to_send) +{ + auto host_addresses = cluster->getShardsAddresses(); + if (host_addresses.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings); + + static pcg64 rng(randomSeed()); + size_t shard_num = rng() % host_addresses.size(); + auto shard_addresses = host_addresses[shard_num]; + /// After getClusterImpl each shard must have exactly 1 replica + if (shard_addresses.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings); + auto host_name = shard_addresses[0].toString(); + + LOG_INFO(log, "Choose remote initiator '{}'", host_name); + + bool secure = shard_addresses[0].secure == Protocol::Secure::Enable; + std::string remote_function_name = secure ? "remoteSecure" : "remote"; + + /// Clean object_storage_remote_initiator setting to avoid infinite remote call + auto new_context = Context::createCopy(context); + new_context->setSetting("object_storage_remote_initiator", false); + + auto * select_query = query_to_send->as(); + if (!select_query) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query"); + + auto query_settings = select_query->settings(); + if (query_settings) + { + auto & settings_ast = query_settings->as(); + if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty()) + { + select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {}); + } + } + + ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send); + if (!table_expression) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression"); + + auto remote_query = makeASTFunction(remote_function_name, std::make_shared(host_name), table_expression->table_function); + + table_expression->table_function = remote_query; + + auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context); + + auto storage = remote_function->execute(query_to_send, new_context, remote_function_name); + + return RemoteCallVariables{storage, new_context}; +} + SinkToStoragePtr IStorageCluster::write( const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index e66ba8222b8a..086bf185f0d6 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -43,6 +43,7 @@ class IStorageCluster : public IStorage /// Query is needed for pruning by virtual columns (_file, _path) virtual RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr cluster) const = 0; @@ -57,9 +58,20 @@ class IStorageCluster : public IStorage virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); } protected: - virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} + struct RemoteCallVariables + { + StoragePtr storage; + ContextPtr context; + }; + + RemoteCallVariables convertToRemote( + ClusterPtr cluster, + ContextPtr context, + const std::string & cluster_name_from_settings, + ASTPtr query_to_send); + virtual void readFallBackToPure( QueryPlan & /* query_plan */, const Names & /* column_names */, diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index c04b0d9b57e1..e4e557edc18f 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -85,6 +85,16 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl return std::nullopt; } + std::optional tryGetSamplePathFromMetadata() const override + { + if (!current_metadata) + return std::nullopt; + auto data_files = current_metadata->getDataFiles(); + if (!data_files.empty()) + return data_files[0]; + return std::nullopt; + } + std::optional totalRows() override { if (!current_metadata) @@ -280,6 +290,7 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, std::string getEngineName() const override { return getImpl().getEngineName(); } std::string getNamespaceType() const override { return getImpl().getNamespaceType(); } + Path getFullPath() const override { return getImpl().getFullPath(); } Path getPath() const override { return getImpl().getPath(); } void setPath(const Path & path) override { getImpl().setPath(path); } @@ -296,9 +307,14 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure) override { getImpl().addStructureAndFormatToArgsIfNeeded(args, structure_, format_, context, with_structure); } + bool withPartitionWildcard() const override { return getImpl().withPartitionWildcard(); } + bool withGlobsIgnorePartitionWildcard() const override { return getImpl().withGlobsIgnorePartitionWildcard(); } + bool isPathWithGlobs() const override { return getImpl().isPathWithGlobs(); } + bool isNamespaceWithGlobs() const override { return getImpl().isNamespaceWithGlobs(); } std::string getPathWithoutGlobs() const override { return getImpl().getPathWithoutGlobs(); } bool isArchive() const override { return getImpl().isArchive(); } + bool isPathInArchiveWithGlobs() const override { return getImpl().isPathInArchiveWithGlobs(); } std::string getPathInArchive() const override { return getImpl().getPathInArchive(); } void check(ContextPtr context) const override { getImpl().check(context); } @@ -340,8 +356,19 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, std::optional tryGetTableStructureFromMetadata() const override { return getImpl().tryGetTableStructureFromMetadata(); } + bool supportsFileIterator() const override { return getImpl().supportsFileIterator(); } + ObjectIterator iterate( + const ActionsDAG * filter_dag, + std::function callback, + size_t list_batch_size) override + { + return getImpl().iterate(filter_dag, callback, list_batch_size); + } + void update(ObjectStoragePtr object_storage, ContextPtr local_context) override { return getImpl().update(object_storage, local_context); } + void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context) override + { return getImpl().updateIfRequired(object_storage, local_context); } void initialize( ASTs & engine_args, @@ -366,7 +393,6 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, void setCompressionMethod(const String & compression_method_) override { getImpl().setCompressionMethod(compression_method_); } void setStructure(const String & structure_) override { getImpl().setStructure(structure_); } -protected: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override { return getImpl().fromNamedCollection(collection, context); } void fromAST(ASTs & args, ContextPtr context, bool with_structure) override @@ -449,6 +475,13 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, createDynamicStorage(type); } + std::optional tryGetSamplePathFromMetadata() const override + { + return getImpl().tryGetSamplePathFromMetadata(); + } + + virtual void assertInitialized() const override { return getImpl().assertInitialized(); } + private: inline StorageObjectStorage::Configuration & getImpl() const { diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h index 7835466e5490..580a9645361d 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h @@ -41,6 +41,8 @@ class DeltaLakeMetadata final : public IDataLakeMetadata DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_); + Strings getDataFiles() const override { return data_files; } + NamesAndTypesList getTableSchema() const override { return schema; } DeltaLakePartitionColumns getPartitionColumns() const { return partition_columns; } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp index dd411f9890d2..1fa3f6636729 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp @@ -32,6 +32,11 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &) return table_snapshot->update(); } +Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const +{ + throwNotImplemented("getDataFiles()"); +} + ObjectIterator DeltaLakeMetadataDeltaKernel::iterate( const ActionsDAG * filter_dag, FileProgressCallback callback, diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h index ececd3d73e7e..88a733b75434 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h @@ -40,6 +40,8 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata bool update(const ContextPtr & context) override; + Strings getDataFiles() const override; + NamesAndTypesList getTableSchema() const override; NamesAndTypesList getReadSchema() const override; diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp index cdfe432b7e62..d028da51799f 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp @@ -91,7 +91,7 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserv { } -Strings HudiMetadata::getDataFiles(const ActionsDAG *) const +Strings HudiMetadata::getDataFiles() const { if (data_files.empty()) data_files = getDataFilesImpl(); @@ -99,11 +99,11 @@ Strings HudiMetadata::getDataFiles(const ActionsDAG *) const } ObjectIterator HudiMetadata::iterate( - const ActionsDAG * filter_dag, + const ActionsDAG * /* filter_dag */, FileProgressCallback callback, size_t /* list_batch_size */) const { - return createKeysIterator(getDataFiles(filter_dag), object_storage, callback); + return createKeysIterator(getDataFiles(), object_storage, callback); } } diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h index a64ecfeb55dd..5b515dc1f37e 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h @@ -19,6 +19,8 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_); + Strings getDataFiles() const override; + NamesAndTypesList getTableSchema() const override { return {}; } bool operator ==(const IDataLakeMetadata & other) const override @@ -49,7 +51,6 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext mutable Strings data_files; Strings getDataFilesImpl() const; - Strings getDataFiles(const ActionsDAG * filter_dag) const; }; } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index fe1fa151b9a0..1984750351bc 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -20,6 +20,9 @@ class IDataLakeMetadata : boost::noncopyable virtual bool operator==(const IDataLakeMetadata & other) const = 0; + /// List all data files. + /// For better parallelization, iterate() method should be used. + virtual Strings getDataFiles() const = 0; /// Return iterator to `data files`. using FileProgressCallback = std::function; virtual ObjectIterator iterate( diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index cfa834687231..0466131cc985 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -609,7 +609,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64 return create_fn(); } -Strings IcebergMetadata::getDataFiles(const ActionsDAG * filter_dag) const +Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const { if (!relevant_snapshot) return {}; @@ -716,7 +716,7 @@ ObjectIterator IcebergMetadata::iterate( FileProgressCallback callback, size_t /* list_batch_size */) const { - return createKeysIterator(getDataFiles(filter_dag), object_storage, callback); + return createKeysIterator(getDataFilesImpl(filter_dag), object_storage, callback); } } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 3f4005d153c4..23b0cd04b897 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -42,6 +42,11 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext const Poco::JSON::Object::Ptr & metadata_object, IcebergMetadataFilesCachePtr cache_ptr); + /// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files. + /// All subsequent calls when the same data snapshot is relevant will return saved list of files (because it cannot be changed + /// without changing metadata file). Drops on every snapshot update. + Strings getDataFiles() const override { return getDataFilesImpl(nullptr); } + /// Get table schema parsed from metadata. NamesAndTypesList getTableSchema() const override { @@ -118,7 +123,7 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext void updateState(const ContextPtr & local_context, bool metadata_file_changed); - Strings getDataFiles(const ActionsDAG * filter_dag) const; + Strings getDataFilesImpl(const ActionsDAG * filter_dag) const; void updateSnapshot(); diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 218c9060aae2..7639ff35228e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -91,7 +91,8 @@ StorageObjectStorage::StorageObjectStorage( bool distributed_processing_, ASTPtr partition_by_, bool is_table_function_, - bool lazy_init) + bool lazy_init, + std::optional sample_path_) : IStorage(table_id_) , configuration(configuration_) , object_storage(object_storage_) @@ -131,7 +132,7 @@ StorageObjectStorage::StorageObjectStorage( if (!do_lazy_init) do_init(); - std::string sample_path; + std::string sample_path = sample_path_.value_or(""); ColumnsDescription columns{columns_}; resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context); configuration->check(context); @@ -353,6 +354,11 @@ std::optional StorageObjectStorage::Configuration::tryGetTab throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetTableStructureFromMetadata is not implemented for basic configuration"); } +std::optional StorageObjectStorage::Configuration::tryGetSamplePathFromMetadata() const +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetSamplePathFromMetadata is not implemented for basic configuration"); +} + void StorageObjectStorage::read( QueryPlan & query_plan, const Names & column_names, @@ -523,9 +529,7 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData( auto table_structure = configuration->tryGetTableStructureFromMetadata(); if (table_structure) - { return table_structure.value(); - } } ObjectInfos read_keys; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index cf4b89194e2d..5b4e10b9a872 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -71,7 +71,8 @@ class StorageObjectStorage : public IStorage bool distributed_processing_ = false, ASTPtr partition_by_ = nullptr, bool is_table_function_ = false, - bool lazy_init = false); + bool lazy_init = false, + std::optional sample_path_ = std::nullopt); String getName() const override; @@ -203,15 +204,15 @@ class StorageObjectStorage::Configuration virtual void addStructureAndFormatToArgsIfNeeded( ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure) = 0; - bool withPartitionWildcard() const; + virtual bool withPartitionWildcard() const; bool withGlobs() const { return isPathWithGlobs() || isNamespaceWithGlobs(); } - bool withGlobsIgnorePartitionWildcard() const; - bool isPathWithGlobs() const; - bool isNamespaceWithGlobs() const; + virtual bool withGlobsIgnorePartitionWildcard() const; + virtual bool isPathWithGlobs() const; + virtual bool isNamespaceWithGlobs() const; virtual std::string getPathWithoutGlobs() const; virtual bool isArchive() const { return false; } - bool isPathInArchiveWithGlobs() const; + virtual bool isPathInArchiveWithGlobs() const; virtual std::string getPathInArchive() const; virtual void check(ContextPtr context) const; @@ -245,6 +246,7 @@ class StorageObjectStorage::Configuration ContextPtr local_context); virtual std::optional tryGetTableStructureFromMetadata() const; + virtual std::optional tryGetSamplePathFromMetadata() const; virtual bool supportsFileIterator() const { return false; } virtual ObjectIterator iterate( @@ -256,7 +258,7 @@ class StorageObjectStorage::Configuration } virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context); - void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context); + virtual void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context); const StorageObjectStorageSettings & getSettingsRef() const; @@ -272,7 +274,7 @@ class StorageObjectStorage::Configuration virtual ObjectStorageType extractDynamicStorageType(ASTs & /* args */, ContextPtr /* context */, ASTPtr * /* type_arg */ = nullptr) const { return ObjectStorageType::None; } - void assertInitialized() const; + virtual void assertInitialized() const; virtual const String & getFormat() const { return format; } virtual const String & getCompressionMethod() const { return compression_method; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 58143a0bbefd..6fdb2c0d4b9d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -112,7 +112,10 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( format_settings_, mode_, /* distributed_processing */false, - partition_by_); + partition_by_, + /* is_table_function */false, + /* lazy_init */false, + sample_path); auto virtuals_ = getVirtualsPtr(); if (virtuals_) @@ -304,7 +307,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( {"s3", "s3Cluster"}, {"azureBlobStorage", "azureBlobStorageCluster"}, {"hdfs", "hdfsCluster"}, - {"iceberg", "icebergS3Cluster"}, + {"iceberg", "icebergCluster"}, {"icebergS3", "icebergS3Cluster"}, {"icebergAzure", "icebergAzureCluster"}, {"icebergHDFS", "icebergHDFSCluster"}, @@ -362,12 +365,13 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & local_context, ClusterPtr cluster) const { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, - local_context, predicate, {}, getVirtualsList(), nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true); + local_context, predicate, filter_actions_dag, getVirtualsList(), nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true); std::vector ids_of_hosts; for (const auto & shard : cluster->getShardsInfo()) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index fd4270518976..7b7031a7c728 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -31,6 +31,7 @@ class StorageObjectStorageCluster : public IStorageCluster RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr cluster) const override; @@ -68,6 +69,22 @@ class StorageObjectStorageCluster : public IStorageCluster void addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const override; + std::optional totalRows(ContextPtr query_context) const override + { + if (pure_storage) + return pure_storage->totalRows(query_context); + configuration->update(object_storage, query_context); + return configuration->totalRows(); + } + + std::optional totalBytes(ContextPtr query_context) const override + { + if (pure_storage) + return pure_storage->totalBytes(query_context); + configuration->update(object_storage, query_context); + return configuration->totalBytes(); + } + private: void updateQueryToSendIfNeeded( ASTPtr & query, diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 3142a579ef4e..a7a3cc94ad06 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1164,7 +1164,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor const auto cluster = getCluster(); /// Select query is needed for pruining on virtual columns - auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster); + auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, filter, local_context, cluster); /// Here we take addresses from destination cluster and assume source table exists on these nodes size_t replica_index = 0; diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index d70e92fa3b45..0ee356c7b88e 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -79,6 +79,7 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & /* filter_actions_dag */, const ContextPtr & context, ClusterPtr) const { diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index 49d39a24ceba..93627fdc31d6 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -29,6 +29,7 @@ class StorageFileCluster : public IStorageCluster std::string getName() const override { return "FileCluster"; } RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr) const override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index e051594fa8e3..6b8353d25556 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6013,7 +6013,7 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu ContextMutablePtr query_context = Context::createCopy(local_context); query_context->increaseDistributedDepth(); - auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster); + auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, {}, local_context, src_cluster); size_t replica_index = 0; for (const auto & replicas : src_cluster->getShardsAddresses()) diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 7ba7f22c62e8..9253e6bec0c6 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -116,6 +116,7 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & /* filter_actions_dag */, const ContextPtr & context, ClusterPtr) const { diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index 9bfbaffe30f8..884ca16e7b12 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -32,6 +32,7 @@ class StorageURLCluster : public IStorageCluster std::string getName() const override { return "URLCluster"; } RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr) const override; diff --git a/src/Storages/extractTableFunctionFromSelectQuery.cpp b/src/Storages/extractTableFunctionFromSelectQuery.cpp index c7f60240b3c7..2f457dadee3b 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionFromSelectQuery.cpp @@ -9,7 +9,7 @@ namespace DB { -ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query) { auto * select_query = query->as(); if (!select_query || !select_query->tables()) @@ -17,10 +17,22 @@ ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) auto * tables = select_query->tables()->as(); auto * table_expression = tables->children[0]->as()->table_expression->as(); - if (!table_expression->table_function) + return table_expression; +} + +ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query) +{ + auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query); + return table_expression ? table_expression->table_function : nullptr; +} + +ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +{ + auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query); + if (!table_function_ast) return nullptr; - auto * table_function = table_expression->table_function->as(); + auto * table_function = table_function_ast->as(); return table_function; } diff --git a/src/Storages/extractTableFunctionFromSelectQuery.h b/src/Storages/extractTableFunctionFromSelectQuery.h index 0511d59f6230..e57d2d0bbac8 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.h +++ b/src/Storages/extractTableFunctionFromSelectQuery.h @@ -7,7 +7,10 @@ namespace DB { +struct ASTTableExpression; +ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query); +ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query); ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); diff --git a/src/TableFunctions/TableFunctionRemote.h b/src/TableFunctions/TableFunctionRemote.h index 0f75bf2b854c..4de60a79aea3 100644 --- a/src/TableFunctions/TableFunctionRemote.h +++ b/src/TableFunctions/TableFunctionRemote.h @@ -26,6 +26,8 @@ class TableFunctionRemote : public ITableFunction bool needStructureConversion() const override { return false; } + void setRemoteTableFunction(ASTPtr remote_table_function_ptr_) { remote_table_function_ptr = remote_table_function_ptr_; } + private: StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; diff --git a/tests/integration/compose/docker_compose_glue_catalog.yml b/tests/integration/compose/docker_compose_glue_catalog.yml index cf10632198a9..1f8cdc84361a 100644 --- a/tests/integration/compose/docker_compose_glue_catalog.yml +++ b/tests/integration/compose/docker_compose_glue_catalog.yml @@ -30,7 +30,7 @@ services: mc: depends_on: - minio - image: minio/mc + image: minio/mc:RELEASE.2025-04-16T18-13-26Z container_name: mc environment: - AWS_ACCESS_KEY_ID=minio diff --git a/tests/integration/compose/docker_compose_iceberg_rest_catalog.yml b/tests/integration/compose/docker_compose_iceberg_rest_catalog.yml index 1f0e330999d8..7d18e21d2fcf 100644 --- a/tests/integration/compose/docker_compose_iceberg_rest_catalog.yml +++ b/tests/integration/compose/docker_compose_iceberg_rest_catalog.yml @@ -51,7 +51,7 @@ services: mc: depends_on: - minio - image: minio/mc + image: minio/mc:RELEASE.2025-04-16T18-13-26Z container_name: mc environment: - AWS_ACCESS_KEY_ID=minio diff --git a/tests/integration/test_cluster_discovery/test.py b/tests/integration/test_cluster_discovery/test.py index ebcb910479ca..be0c5b2f640d 100644 --- a/tests/integration/test_cluster_discovery/test.py +++ b/tests/integration/test_cluster_discovery/test.py @@ -125,6 +125,13 @@ def test_cluster_discovery_startup_and_stop(start_cluster): def test_cluster_discovery_macros(start_cluster): + # wait for all nodes to be started + check_nodes_count = functools.partial( + check_on_cluster, what="count()", msg="Wrong nodes count in cluster" + ) + total_nodes = len(nodes) - 1 + check_nodes_count([nodes["node_observer"]], total_nodes) + # check macros res = nodes["node_observer"].query( "SELECT sum(number) FROM clusterAllReplicas('{autocluster}', system.numbers) WHERE number=1" diff --git a/tests/integration/test_mask_sensitive_info/test.py b/tests/integration/test_mask_sensitive_info/test.py index c15ccfc9c237..8a1ef4e73ea1 100644 --- a/tests/integration/test_mask_sensitive_info/test.py +++ b/tests/integration/test_mask_sensitive_info/test.py @@ -498,9 +498,6 @@ def test_table_functions(): f"s3('http://minio1:9001/root/data/test6.csv', 'minio', '{password}', 'CSV')", f"s3('http://minio1:9001/root/data/test7.csv', 'minio', '{password}', 'CSV', 'x int')", f"s3('http://minio1:9001/root/data/test8.csv.gz', 'minio', '{password}', 'CSV', 'x int', 'gzip')", - f"s3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test1.csv', 'minio', '{password}')", - f"s3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test2.csv', 'CSV', 'x int')", - f"s3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test3.csv', 'minio', '{password}', 'CSV')", f"remote('127.{{2..11}}', default.remote_table)", f"remote('127.{{2..11}}', default.remote_table, rand())", f"remote('127.{{2..11}}', default.remote_table, 'remote_user')", @@ -529,15 +526,6 @@ def test_table_functions(): f"azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '{azure_account_key}', 'CSV', 'none', 'auto')", f"azureBlobStorage(named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", f"azureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')", - f"azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')", - f"azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')", - f"azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')", - f"azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '{azure_account_key}')", - f"azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '{azure_account_key}', 'CSV')", - f"azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '{azure_account_key}', 'CSV', 'none')", - f"azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '{azure_account_key}', 'CSV', 'none', 'auto')", - f"azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", - f"azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')", f"iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')", f"iceberg(named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '{password}')", f"icebergS3('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')", @@ -547,13 +535,6 @@ def test_table_functions(): f"icebergAzure('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '{azure_account_key}', 'CSV', 'none', 'auto')", f"icebergAzure(named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", f"icebergAzure(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')", - f"icebergS3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')", - f"icebergS3Cluster('test_shard_localhost', named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '{password}')", - f"icebergAzureCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple.csv')", - f"icebergAzureCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '{azure_account_key}')", - f"icebergAzureCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '{azure_account_key}', 'CSV', 'none', 'auto')", - f"icebergAzureCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", - f"icebergAzureCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '{azure_account_key}')", f"iceberg(storage_type='s3', 'http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')", f"iceberg(storage_type='s3', named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '{password}')", f"iceberg(storage_type='azure', '{azure_conn_string}', 'cont', 'test_simple.csv')", @@ -614,70 +595,51 @@ def make_test_case(i): "CREATE TABLE tablefunc8 (`x` int) AS s3('http://minio1:9001/root/data/test6.csv', 'minio', '[HIDDEN]', 'CSV')", "CREATE TABLE tablefunc9 (`x` int) AS s3('http://minio1:9001/root/data/test7.csv', 'minio', '[HIDDEN]', 'CSV', 'x int')", "CREATE TABLE tablefunc10 (`x` int) AS s3('http://minio1:9001/root/data/test8.csv.gz', 'minio', '[HIDDEN]', 'CSV', 'x int', 'gzip')", - "CREATE TABLE tablefunc11 (`x` int) AS s3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test1.csv', 'minio', '[HIDDEN]')", - "CREATE TABLE tablefunc12 (x int) AS s3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test2.csv', 'CSV', 'x int')", - "CREATE TABLE tablefunc13 (`x` int) AS s3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test3.csv', 'minio', '[HIDDEN]', 'CSV')", - "CREATE TABLE tablefunc14 (x int) AS remote('127.{2..11}', default.remote_table)", - "CREATE TABLE tablefunc15 (x int) AS remote('127.{2..11}', default.remote_table, rand())", - "CREATE TABLE tablefunc16 (x int) AS remote('127.{2..11}', default.remote_table, 'remote_user')", - "CREATE TABLE tablefunc17 (`x` int) AS remote('127.{2..11}', default.remote_table, 'remote_user', '[HIDDEN]')", - "CREATE TABLE tablefunc18 (x int) AS remote('127.{2..11}', default.remote_table, 'remote_user', rand())", - "CREATE TABLE tablefunc19 (`x` int) AS remote('127.{2..11}', default.remote_table, 'remote_user', '[HIDDEN]', rand())", - "CREATE TABLE tablefunc20 (`x` int) AS remote('127.{2..11}', 'default.remote_table', 'remote_user', '[HIDDEN]', rand())", - "CREATE TABLE tablefunc21 (`x` int) AS remote('127.{2..11}', 'default', 'remote_table', 'remote_user', '[HIDDEN]', rand())", - "CREATE TABLE tablefunc22 (`x` int) AS remote('127.{2..11}', numbers(10), 'remote_user', '[HIDDEN]', rand())", - "CREATE TABLE tablefunc23 (`x` int) AS remoteSecure('127.{2..11}', 'default', 'remote_table', 'remote_user', '[HIDDEN]')", - "CREATE TABLE tablefunc24 (x int) AS remoteSecure('127.{2..11}', 'default', 'remote_table', 'remote_user', rand())", - "CREATE TABLE tablefunc25 (`x` int) AS mysql(named_collection_1, host = 'mysql80', port = 3306, database = 'mysql_db', `table` = 'mysql_table', user = 'mysql_user', password = '[HIDDEN]')", - "CREATE TABLE tablefunc26 (`x` int) AS postgresql(named_collection_2, password = '[HIDDEN]', host = 'postgres1', port = 5432, database = 'postgres_db', `table` = 'postgres_table', user = 'postgres_user')", - "CREATE TABLE tablefunc27 (`x` int) AS s3(named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", - "CREATE TABLE tablefunc28 (`x` int) AS remote(named_collection_6, addresses_expr = '127.{2..11}', database = 'default', `table` = 'remote_table', user = 'remote_user', password = '[HIDDEN]', sharding_key = rand())", - "CREATE TABLE tablefunc29 (`x` int) AS remoteSecure(named_collection_6, addresses_expr = '127.{2..11}', database = 'default', `table` = 'remote_table', user = 'remote_user', password = '[HIDDEN]')", - "CREATE TABLE tablefunc30 (x int) AS s3('http://minio1:9001/root/data/test9.csv.gz', 'NOSIGN', 'CSV')", - "CREATE TABLE tablefunc31 (`x` int) AS s3('http://minio1:9001/root/data/test10.csv.gz', 'minio', '[HIDDEN]')", - "CREATE TABLE tablefunc32 (`x` int) AS deltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", - f"CREATE TABLE tablefunc33 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')", - f"CREATE TABLE tablefunc34 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')", - f"CREATE TABLE tablefunc35 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')", - f"CREATE TABLE tablefunc36 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '[HIDDEN]')", - f"CREATE TABLE tablefunc37 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')", - f"CREATE TABLE tablefunc38 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')", - f"CREATE TABLE tablefunc39 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", - f"CREATE TABLE tablefunc40 (`x` int) AS azureBlobStorage(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", - f"CREATE TABLE tablefunc41 (`x` int) AS azureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", - f"CREATE TABLE tablefunc42 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_9.csv', 'CSV')", - f"CREATE TABLE tablefunc43 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_10.csv', 'CSV', 'none')", - f"CREATE TABLE tablefunc44 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_11.csv', 'CSV', 'none', 'auto')", - f"CREATE TABLE tablefunc45 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_12.csv', '{azure_account_name}', '[HIDDEN]')", - f"CREATE TABLE tablefunc46 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_13.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')", - f"CREATE TABLE tablefunc47 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_14.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')", - f"CREATE TABLE tablefunc48 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_15.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", - f"CREATE TABLE tablefunc49 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')", - f"CREATE TABLE tablefunc50 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", - "CREATE TABLE tablefunc51 (`x` int) AS iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", - "CREATE TABLE tablefunc52 (`x` int) AS iceberg(named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", - "CREATE TABLE tablefunc53 (`x` int) AS icebergS3('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", - "CREATE TABLE tablefunc54 (`x` int) AS icebergS3(named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", - f"CREATE TABLE tablefunc55 (`x` int) AS icebergAzure('{masked_azure_conn_string}', 'cont', 'test_simple.csv')", - f"CREATE TABLE tablefunc56 (`x` int) AS icebergAzure('{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '[HIDDEN]')", - f"CREATE TABLE tablefunc57 (`x` int) AS icebergAzure('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", - f"CREATE TABLE tablefunc58 (`x` int) AS icebergAzure(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", - f"CREATE TABLE tablefunc59 (`x` int) AS icebergAzure(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", - "CREATE TABLE tablefunc60 (`x` int) AS icebergS3Cluster('test_shard_localhost', 'http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", - "CREATE TABLE tablefunc61 (`x` int) AS icebergS3Cluster('test_shard_localhost', named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", - f"CREATE TABLE tablefunc62 (`x` int) AS icebergAzureCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple.csv')", - f"CREATE TABLE tablefunc63 (`x` int) AS icebergAzureCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '[HIDDEN]')", - f"CREATE TABLE tablefunc64 (`x` int) AS icebergAzureCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", - f"CREATE TABLE tablefunc65 (`x` int) AS icebergAzureCluster('test_shard_localhost', named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", - f"CREATE TABLE tablefunc66 (`x` int) AS icebergAzureCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", - "CREATE TABLE tablefunc67 (`x` int) AS iceberg(storage_type = 's3', 'http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", - "CREATE TABLE tablefunc68 (`x` int) AS iceberg(storage_type = 's3', named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", - f"CREATE TABLE tablefunc69 (`x` int) AS iceberg(storage_type = 'azure', '{masked_azure_conn_string}', 'cont', 'test_simple.csv')", - f"CREATE TABLE tablefunc70 (`x` int) AS iceberg(storage_type = 'azure', '{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '[HIDDEN]')", - f"CREATE TABLE tablefunc71 (`x` int) AS iceberg(storage_type = 'azure', '{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", - f"CREATE TABLE tablefunc72 (`x` int) AS iceberg(storage_type = 'azure', named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", - f"CREATE TABLE tablefunc73 (`x` int) AS iceberg(storage_type = 'azure', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", - "CREATE TABLE tablefunc74 (`x` int) AS gcs('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", + "CREATE TABLE tablefunc11 (x int) AS remote('127.{2..11}', default.remote_table)", + "CREATE TABLE tablefunc12 (x int) AS remote('127.{2..11}', default.remote_table, rand())", + "CREATE TABLE tablefunc13 (x int) AS remote('127.{2..11}', default.remote_table, 'remote_user')", + "CREATE TABLE tablefunc14 (`x` int) AS remote('127.{2..11}', default.remote_table, 'remote_user', '[HIDDEN]')", + "CREATE TABLE tablefunc15 (x int) AS remote('127.{2..11}', default.remote_table, 'remote_user', rand())", + "CREATE TABLE tablefunc16 (`x` int) AS remote('127.{2..11}', default.remote_table, 'remote_user', '[HIDDEN]', rand())", + "CREATE TABLE tablefunc17 (`x` int) AS remote('127.{2..11}', 'default.remote_table', 'remote_user', '[HIDDEN]', rand())", + "CREATE TABLE tablefunc18 (`x` int) AS remote('127.{2..11}', 'default', 'remote_table', 'remote_user', '[HIDDEN]', rand())", + "CREATE TABLE tablefunc19 (`x` int) AS remote('127.{2..11}', numbers(10), 'remote_user', '[HIDDEN]', rand())", + "CREATE TABLE tablefunc20 (`x` int) AS remoteSecure('127.{2..11}', 'default', 'remote_table', 'remote_user', '[HIDDEN]')", + "CREATE TABLE tablefunc21 (x int) AS remoteSecure('127.{2..11}', 'default', 'remote_table', 'remote_user', rand())", + "CREATE TABLE tablefunc22 (`x` int) AS mysql(named_collection_1, host = 'mysql80', port = 3306, database = 'mysql_db', `table` = 'mysql_table', user = 'mysql_user', password = '[HIDDEN]')", + "CREATE TABLE tablefunc23 (`x` int) AS postgresql(named_collection_2, password = '[HIDDEN]', host = 'postgres1', port = 5432, database = 'postgres_db', `table` = 'postgres_table', user = 'postgres_user')", + "CREATE TABLE tablefunc24 (`x` int) AS s3(named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", + "CREATE TABLE tablefunc25 (`x` int) AS remote(named_collection_6, addresses_expr = '127.{2..11}', database = 'default', `table` = 'remote_table', user = 'remote_user', password = '[HIDDEN]', sharding_key = rand())", + "CREATE TABLE tablefunc26 (`x` int) AS remoteSecure(named_collection_6, addresses_expr = '127.{2..11}', database = 'default', `table` = 'remote_table', user = 'remote_user', password = '[HIDDEN]')", + "CREATE TABLE tablefunc27 (x int) AS s3('http://minio1:9001/root/data/test9.csv.gz', 'NOSIGN', 'CSV')", + "CREATE TABLE tablefunc28 (`x` int) AS s3('http://minio1:9001/root/data/test10.csv.gz', 'minio', '[HIDDEN]')", + "CREATE TABLE tablefunc29 (`x` int) AS deltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", + f"CREATE TABLE tablefunc30 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')", + f"CREATE TABLE tablefunc31 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')", + f"CREATE TABLE tablefunc32 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')", + f"CREATE TABLE tablefunc33 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '[HIDDEN]')", + f"CREATE TABLE tablefunc34 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')", + f"CREATE TABLE tablefunc35 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')", + f"CREATE TABLE tablefunc36 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", + f"CREATE TABLE tablefunc37 (`x` int) AS azureBlobStorage(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"CREATE TABLE tablefunc38 (`x` int) AS azureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", + "CREATE TABLE tablefunc39 (`x` int) AS iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", + "CREATE TABLE tablefunc40 (`x` int) AS iceberg(named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", + "CREATE TABLE tablefunc41 (`x` int) AS icebergS3('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", + "CREATE TABLE tablefunc42 (`x` int) AS icebergS3(named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", + f"CREATE TABLE tablefunc43 (`x` int) AS icebergAzure('{masked_azure_conn_string}', 'cont', 'test_simple.csv')", + f"CREATE TABLE tablefunc44 (`x` int) AS icebergAzure('{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '[HIDDEN]')", + f"CREATE TABLE tablefunc45 (`x` int) AS icebergAzure('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", + f"CREATE TABLE tablefunc46 (`x` int) AS icebergAzure(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"CREATE TABLE tablefunc47 (`x` int) AS icebergAzure(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", + "CREATE TABLE tablefunc48 (`x` int) AS iceberg(storage_type = 's3', 'http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", + "CREATE TABLE tablefunc49 (`x` int) AS iceberg(storage_type = 's3', named_collection_2, url = 'http://minio1:9001/root/data/test4.csv', access_key_id = 'minio', secret_access_key = '[HIDDEN]')", + f"CREATE TABLE tablefunc50 (`x` int) AS iceberg(storage_type = 'azure', '{masked_azure_conn_string}', 'cont', 'test_simple.csv')", + f"CREATE TABLE tablefunc51 (`x` int) AS iceberg(storage_type = 'azure', '{azure_storage_account_url}', 'cont', 'test_simple.csv', '{azure_account_name}', '[HIDDEN]')", + f"CREATE TABLE tablefunc52 (`x` int) AS iceberg(storage_type = 'azure', '{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')", + f"CREATE TABLE tablefunc53 (`x` int) AS iceberg(storage_type = 'azure', named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')", + f"CREATE TABLE tablefunc54 (`x` int) AS iceberg(storage_type = 'azure', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')", + "CREATE TABLE tablefunc55 (`x` int) AS gcs('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')", ], must_not_contain=[password], ) diff --git a/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml b/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml index 892665d3934d..77f9e7e4b17b 100644 --- a/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml +++ b/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml @@ -14,7 +14,7 @@ http://minio1:9001/root/ minio - minio123 + ClickHouse_Minio_P@ssw0rd s3 diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 8b4a2008d9c0..cede5ccf9c64 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -1,6 +1,7 @@ import logging import os import uuid +import time from datetime import datetime, timezone import pyspark @@ -580,7 +581,7 @@ def test_types(started_cluster, format_version, storage_type): [ ["a", "Nullable(Int32)"], ["b", "Nullable(String)"], - ["c", "Nullable(Date32)"], + ["c", "Nullable(Date)"], ["d", "Array(Nullable(String))"], ["e", "Nullable(Bool)"], ] @@ -603,7 +604,7 @@ def test_types(started_cluster, format_version, storage_type): [ ["a", "Nullable(Int32)"], ["b", "Nullable(String)"], - ["c", "Nullable(Date32)"], + ["c", "Nullable(Date)"], ["d", "Array(Nullable(String))"], ["e", "Nullable(Bool)"], ] @@ -844,14 +845,6 @@ def make_query_from_table(alt_syntax=False): count_secondary_subqueries(started_cluster, query_id_pure_table_engine_with_type_in_nc, 0, "table engine with storage type in named collection") count_secondary_subqueries(started_cluster, query_id_pure_table_engine_cluster_with_type_in_nc, 1, "table engine with cluster setting with storage type in named collection") - select_remote_cluster = ( - instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})") - .strip() - .split() - ) - assert len(select_remote_cluster) == 600 - assert select_remote_cluster == select_regular - @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) @@ -2134,7 +2127,10 @@ def test_filesystem_cache(started_cluster, storage_type): @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) -def test_partition_pruning(started_cluster, storage_type): +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_partition_pruning(started_cluster, storage_type, run_on_cluster): + if run_on_cluster and storage_type == "local": + pytest.skip("Local storage is not supported on cluster") instance = started_cluster.instances["node1"] spark = started_cluster.spark_session TABLE_NAME = "test_partition_pruning_" + storage_type + "_" + get_uuid_str() @@ -2182,7 +2178,7 @@ def execute_spark_query(query: str): ) creation_expression = get_creation_expression( - storage_type, TABLE_NAME, started_cluster, table_function=True + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster ) def check_validity_and_get_prunned_files(select_expression): @@ -3078,7 +3074,10 @@ def test_explicit_metadata_file(started_cluster, storage_type): @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) -def test_minmax_pruning_with_null(started_cluster, storage_type): +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_minmax_pruning_with_null(started_cluster, storage_type, run_on_cluster): + if run_on_cluster and storage_type == "local": + pytest.skip("Local storage is not supported on cluster") instance = started_cluster.instances["node1"] spark = started_cluster.spark_session TABLE_NAME = "test_minmax_pruning_with_null" + storage_type + "_" + get_uuid_str() @@ -3149,7 +3148,7 @@ def execute_spark_query(query: str): ) creation_expression = get_creation_expression( - storage_type, TABLE_NAME, started_cluster, table_function=True + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster ) def check_validity_and_get_prunned_files(select_expression):