diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 41064d17a5a9..2ba407b37d97 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -157,6 +157,10 @@ # include #endif +#if USE_PARQUET +# include +#endif + #include /// A minimal file used when the server is run without installation @@ -338,6 +342,7 @@ namespace ServerSetting extern const ServerSettingsBool abort_on_logical_error; extern const ServerSettingsUInt64 jemalloc_flush_profile_interval_bytes; extern const ServerSettingsBool jemalloc_flush_profile_on_memory_exceeded; + extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size; } namespace ErrorCodes @@ -2524,6 +2529,10 @@ try auto replicas_reconnector = ReplicasReconnector::init(global_context); +#if USE_PARQUET + ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]); +#endif + /// Set current database name before loading tables and databases because /// system logs may copy global context. std::string default_database = server_settings[ServerSetting::default_database]; diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 7fb8cdb95243..060e44283d35 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -320,6 +320,7 @@ enum class AccessType : uint8_t M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ + M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \ M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index f0f0f024a34d..7bad1790d736 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -1139,7 +1139,8 @@ The server successfully detected this situation and will download merged part fr M(AsyncLoggingErrorFileLogDroppedMessages, "How many messages have been dropped from error file log due to the async log queue being full", ValueType::Number) \ M(AsyncLoggingSyslogDroppedMessages, "How many messages have been dropped from the syslog due to the async log queue being full", ValueType::Number) \ M(AsyncLoggingTextLogDroppedMessages, "How many messages have been dropped from text_log due to the async log queue being full", ValueType::Number) \ - + M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \ + M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \ #ifdef APPLY_FOR_EXTERNAL_EVENTS #define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M) diff --git a/src/Core/FormatFactorySettings.h b/src/Core/FormatFactorySettings.h index 41a2bbaa98a0..03caeae01bec 100644 --- a/src/Core/FormatFactorySettings.h +++ b/src/Core/FormatFactorySettings.h @@ -1458,8 +1458,7 @@ Use geo column parser to convert Array(UInt8) into Point/Linestring/Polygon/Mult DECLARE(Bool, output_format_parquet_geometadata, true, R"( Allow to write information about geo columns in parquet metadata and encode columns in WKB format. )", 0) \ - - + DECLARE(Bool, input_format_parquet_use_metadata_cache, true, R"(Enable parquet file metadata caching)", 0) \ // End of FORMAT_FACTORY_SETTINGS #define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \ diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index 004f8a16098c..e861bf43e85f 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -1139,8 +1139,7 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_ DECLARE(UInt64, threadpool_local_fs_reader_queue_size, 1000000, R"(The maximum number of jobs that can be scheduled on the thread pool for reading from local filesystem.)", 0) \ DECLARE(NonZeroUInt64, threadpool_remote_fs_reader_pool_size, 250, R"(Number of threads in the Thread pool used for reading from remote filesystem when `remote_filesystem_read_method = 'threadpool'`.)", 0) \ DECLARE(UInt64, threadpool_remote_fs_reader_queue_size, 1000000, R"(The maximum number of jobs that can be scheduled on the thread pool for reading from remote filesystem.)", 0) \ - - + DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \ // clang-format on /// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index bc2cf3c8f5f4..ff1a977c00ea 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -279,6 +279,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"parallel_hash_join_threshold", 0, 0, "New setting"}, /// Release closed. Please use 25.4 }); + addSettingsChanges(settings_changes_history, "24.12.2.20000", + { + // Altinity Antalya modifications atop of 24.12 + {"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, // https://github.com/Altinity/ClickHouse/pull/586 + }); addSettingsChanges(settings_changes_history, "25.2", { /// Release closed. Please use 25.3 diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 793e3fc0a678..87323976172a 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -81,6 +81,10 @@ #include #endif +#if USE_PARQUET +#include +#endif + #if USE_AWS_S3 #include #endif @@ -436,6 +440,16 @@ BlockIO InterpreterSystemQuery::execute() getContext()->clearQueryResultCache(query.query_result_cache_tag); break; } + case Type::DROP_PARQUET_METADATA_CACHE: + { +#if USE_PARQUET + getContext()->checkAccess(AccessType::SYSTEM_DROP_PARQUET_METADATA_CACHE); + ParquetFileMetaDataCache::instance()->clear(); + break; +#else + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for Parquet"); +#endif + } case Type::DROP_COMPILED_EXPRESSION_CACHE: #if USE_EMBEDDED_COMPILER getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE); @@ -1589,6 +1603,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() case Type::DROP_PAGE_CACHE: case Type::DROP_SCHEMA_CACHE: case Type::DROP_FORMAT_SCHEMA_CACHE: + case Type::DROP_PARQUET_METADATA_CACHE: case Type::DROP_S3_CLIENT_CACHE: { required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE); diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index 12f1e7ac4d71..2df2ce77aef8 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -484,6 +484,7 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti case Type::DROP_COMPILED_EXPRESSION_CACHE: case Type::DROP_S3_CLIENT_CACHE: case Type::DROP_ICEBERG_METADATA_CACHE: + case Type::DROP_PARQUET_METADATA_CACHE: case Type::RESET_COVERAGE: case Type::RESTART_REPLICAS: case Type::JEMALLOC_PURGE: diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index 3226a93eed63..53d2b0001c4f 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -41,6 +41,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster DROP_SCHEMA_CACHE, DROP_FORMAT_SCHEMA_CACHE, DROP_S3_CLIENT_CACHE, + DROP_PARQUET_METADATA_CACHE, STOP_LISTEN, START_LISTEN, RESTART_REPLICAS, diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index 31a7e816f24a..d7a86cf52b24 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB @@ -79,6 +80,9 @@ class IInputFormat : public ISource void needOnlyCount() { need_only_count = true; } + /// Set additional info/key/id related to underlying storage of the ReadBuffer + virtual void setStorageRelatedUniqueKey(const Settings & /*settings*/, const String & /*key*/) {} + protected: ReadBuffer & getReadBuffer() const { chassert(in); return *in; } diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index e65beb30d385..b47fe1dd5f3c 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -3,6 +3,9 @@ #if USE_PARQUET +#include +#include +#include #include #include #include @@ -30,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +45,8 @@ namespace ProfileEvents extern const Event ParquetFetchWaitTimeMicroseconds; extern const Event ParquetReadRowGroups; extern const Event ParquetPrunedRowGroups; + extern const Event ParquetMetaDataCacheHits; + extern const Event ParquetMetaDataCacheMisses; } namespace CurrentMetrics @@ -57,6 +63,16 @@ namespace CurrentMetrics namespace DB { +namespace Setting +{ + extern const SettingsBool input_format_parquet_use_metadata_cache; +} + +namespace ServerSetting +{ + extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size; +} + namespace ErrorCodes { extern const int BAD_ARGUMENTS; @@ -543,6 +559,49 @@ static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaDa return hyperrectangle; } +std::shared_ptr ParquetBlockInputFormat::readMetadataFromFile() +{ + createArrowFileIfNotCreated(); + return parquet::ReadMetaData(arrow_file); +} + +std::shared_ptr ParquetBlockInputFormat::getFileMetaData() +{ + // in-memory cache is not implemented for local file operations, only for remote files + // there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation + // and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key + if (!metadata_cache.use_cache || metadata_cache.key.empty()) + { + return readMetadataFromFile(); + } + + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet( + metadata_cache.key, + [&]() + { + return readMetadataFromFile(); + } + ); + if (loaded) + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); + else + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits); + return parquet_file_metadata; +} + +void ParquetBlockInputFormat::createArrowFileIfNotCreated() +{ + if (arrow_file) + { + return; + } + + // Create arrow file adapter. + // TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that + // we'll need to read (which we know in advance). Use max_download_threads for that. + arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); +} + std::unordered_set getBloomFilterFilteringColumnKeys(const KeyCondition::RPN & rpn) { std::unordered_set column_keys; @@ -656,7 +715,7 @@ void ParquetBlockInputFormat::initializeIfNeeded() if (is_stopped) return; - metadata = parquet::ReadMetaData(arrow_file); + metadata = getFileMetaData(); const bool prefetch_group = io_pool != nullptr; std::shared_ptr schema; @@ -762,6 +821,8 @@ void ParquetBlockInputFormat::initializeIfNeeded() } } + bool has_row_groups_to_read = false; + auto skip_row_group_based_on_filters = [&](int row_group) { if (!format_settings.parquet.filter_push_down && !format_settings.parquet.bloom_filter_push_down) @@ -820,7 +881,20 @@ void ParquetBlockInputFormat::initializeIfNeeded() row_group_batches.back().total_bytes_compressed += row_group_size; auto rows = adaptive_chunk_size(row_group); row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size; + + has_row_groups_to_read = true; } + + if (has_row_groups_to_read) + { + createArrowFileIfNotCreated(); + } +} + +void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_) +{ + metadata_cache.key = key_; + metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; } void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx) diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index cf30aa6e36b1..1038e1d35037 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -72,6 +72,8 @@ class ParquetBlockInputFormat : public IInputFormat size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; } + void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; + private: Chunk read() override; @@ -90,6 +92,13 @@ class ParquetBlockInputFormat : public IInputFormat void threadFunction(size_t row_group_batch_idx); + void createArrowFileIfNotCreated(); + std::shared_ptr readMetadataFromFile(); + + std::shared_ptr getFileMetaData(); + + inline bool supportPrefetch() const; + // Data layout in the file: // // row group 0 @@ -340,6 +349,13 @@ class ParquetBlockInputFormat : public IInputFormat bool is_initialized = false; std::optional> parquet_names_to_clickhouse; std::optional> clickhouse_names_to_parquet; + struct Cache + { + String key; + bool use_cache = false; + }; + + Cache metadata_cache; }; class ArrowParquetSchemaReader : public ISchemaReader diff --git a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp new file mode 100644 index 000000000000..da8ad825f505 --- /dev/null +++ b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp @@ -0,0 +1,20 @@ +#include + +#if USE_PARQUET + +namespace DB +{ + +ParquetFileMetaDataCache::ParquetFileMetaDataCache() + : CacheBase(CurrentMetrics::end(), CurrentMetrics::end(), 0) +{} + +ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance() +{ + static ParquetFileMetaDataCache instance; + return &instance; +} + +} + +#endif diff --git a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h new file mode 100644 index 000000000000..fb5fc1bb0217 --- /dev/null +++ b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h @@ -0,0 +1,30 @@ +#pragma once + +#include "config.h" + +#if USE_PARQUET + +namespace parquet +{ + +class FileMetaData; + +} + +#include + +namespace DB +{ + +class ParquetFileMetaDataCache : public CacheBase +{ +public: + static ParquetFileMetaDataCache * instance(); + +private: + ParquetFileMetaDataCache(); +}; + +} + +#endif diff --git a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp index 4e8973c2bc61..1a9ac2cb0379 100644 --- a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp @@ -22,8 +22,17 @@ #include #include #include +#include +#include +#include +namespace ProfileEvents +{ +extern const Event ParquetMetaDataCacheHits; +extern const Event ParquetMetaDataCacheMisses; +} + namespace DB { @@ -32,6 +41,11 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +namespace Setting +{ +extern const SettingsBool input_format_parquet_use_metadata_cache; +} + static NamesAndTypesList getHeaderForParquetMetadata() { NamesAndTypesList names_and_types{ @@ -130,10 +144,35 @@ void checkHeader(const Block & header) static std::shared_ptr getFileMetadata( ReadBuffer & in, const FormatSettings & format_settings, - std::atomic & is_stopped) + std::atomic & is_stopped, + ParquetMetadataInputFormat::Cache metadata_cache) { - auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); - return parquet::ReadMetaData(arrow_file); + // in-memory cache is not implemented for local file operations, only for remote files + // there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation + // and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key + if (!metadata_cache.use_cache || metadata_cache.key.empty()) + { + auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); + return parquet::ReadMetaData(arrow_file); + } + + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet( + metadata_cache.key, + [&]() + { + auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); + return parquet::ReadMetaData(arrow_file); + } + ); + + if (loaded) + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); + else + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits); + + return parquet_file_metadata; + + } ParquetMetadataInputFormat::ParquetMetadataInputFormat(ReadBuffer & in_, SharedHeader header_, const FormatSettings & format_settings_) @@ -148,7 +187,7 @@ Chunk ParquetMetadataInputFormat::read() if (done) return res; - auto metadata = getFileMetadata(*in, format_settings, is_stopped); + auto metadata = getFileMetadata(*in, format_settings, is_stopped, metadata_cache); const auto & header = getPort().getHeader(); auto names_and_types = getHeaderForParquetMetadata(); @@ -489,6 +528,12 @@ void ParquetMetadataInputFormat::resetParser() done = false; } +void ParquetMetadataInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_) +{ + metadata_cache.key = key_; + metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; +} + ParquetMetadataSchemaReader::ParquetMetadataSchemaReader(ReadBuffer & in_) : ISchemaReader(in_) { diff --git a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h index 81cf7890ee7e..6b667dcc5b1e 100644 --- a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h @@ -62,6 +62,14 @@ class ParquetMetadataInputFormat : public IInputFormat void resetParser() override; + void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; + + struct Cache + { + String key; + bool use_cache = false; + }; + private: Chunk read() override; @@ -78,6 +86,8 @@ class ParquetMetadataInputFormat : public IInputFormat const FormatSettings format_settings; bool done = false; std::atomic is_stopped{0}; + + Cache metadata_cache; }; class ParquetMetadataSchemaReader : public ISchemaReader diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index f780d9d870cd..c37be075284b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -733,6 +733,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (need_only_count) input_format->needOnlyCount(); + if (!object_info->getPath().empty()) + input_format->setStorageRelatedUniqueKey(context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag); + builder.init(Pipe(input_format)); configuration->addDeleteTransformers(object_info, builder, format_settings, context_); diff --git a/tests/integration/test_storage_delta/configs/users.d/disable_parquet_metadata_caching.xml b/tests/integration/test_storage_delta/configs/users.d/disable_parquet_metadata_caching.xml new file mode 100644 index 000000000000..bc34464e30da --- /dev/null +++ b/tests/integration/test_storage_delta/configs/users.d/disable_parquet_metadata_caching.xml @@ -0,0 +1,7 @@ + + + + 0 + + + diff --git a/tests/integration/test_storage_delta/test.py b/tests/integration/test_storage_delta/test.py index 6241ed6ecd8a..3b40e1eb4870 100644 --- a/tests/integration/test_storage_delta/test.py +++ b/tests/integration/test_storage_delta/test.py @@ -101,6 +101,7 @@ def started_cluster(): user_configs=[ "configs/users.d/users.xml", "configs/users.d/enable_writes.xml", + "configs/users.d/disable_parquet_metadata_caching.xml", ], with_minio=True, with_azurite=True, @@ -117,6 +118,7 @@ def started_cluster(): user_configs=[ "configs/users.d/users.xml", "configs/users.d/enable_writes.xml", + "configs/users.d/disable_parquet_metadata_caching.xml", ], with_minio=True, stay_alive=True, @@ -163,6 +165,7 @@ def started_cluster(): user_configs=[ "configs/users.d/users.xml", "configs/users.d/disabled_delta_kernel.xml", + "configs/users.d/disable_parquet_metadata_caching.xml", ], with_minio=True, with_azurite=True, @@ -1374,7 +1377,7 @@ def test_session_token(started_cluster): parquet_data_path = create_initial_data_file( started_cluster, instance, - "SELECT toUInt64(number), toString(number) FROM numbers(100)", + "SELECT toUInt64(number), toString(number) FROM numbers(100) SETTINGS input_format_parquet_use_metadata_cache=0", TABLE_NAME, node_name=node_name, ) @@ -1387,7 +1390,7 @@ def test_session_token(started_cluster): f""" SELECT count() FROM deltaLake( 'http://{started_cluster.minio_host}:{started_cluster.minio_port}/{started_cluster.minio_bucket}/{TABLE_NAME}/', - SETTINGS allow_experimental_delta_kernel_rs=1) + SETTINGS allow_experimental_delta_kernel_rs=1, input_format_parquet_use_metadata_cache=0) """ ) ) diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index c1278b6acda8..20739c2a3262 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -133,6 +133,7 @@ SYSTEM DROP PAGE CACHE ['SYSTEM DROP PAGE CACHE','DROP PAGE CACHE'] GLOBAL SYSTE SYSTEM DROP SCHEMA CACHE ['SYSTEM DROP SCHEMA CACHE','DROP SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP FORMAT SCHEMA CACHE ['SYSTEM DROP FORMAT SCHEMA CACHE','DROP FORMAT SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP S3 CLIENT CACHE ['SYSTEM DROP S3 CLIENT','DROP S3 CLIENT CACHE'] GLOBAL SYSTEM DROP CACHE +SYSTEM DROP PARQUET METADATA CACHE ['SYSTEM DROP PARQUET METADATA CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP CACHE ['DROP CACHE'] \N SYSTEM SYSTEM RELOAD CONFIG ['RELOAD CONFIG'] GLOBAL SYSTEM RELOAD SYSTEM RELOAD USERS ['RELOAD USERS'] GLOBAL SYSTEM RELOAD diff --git a/tests/queries/0_stateless/02995_settings_25_7_1.tsv b/tests/queries/0_stateless/02995_settings_25_7_1.tsv index 490c9e15bc53..bfe94fd4413e 100644 --- a/tests/queries/0_stateless/02995_settings_25_7_1.tsv +++ b/tests/queries/0_stateless/02995_settings_25_7_1.tsv @@ -584,6 +584,7 @@ input_format_parquet_case_insensitive_column_matching 0 input_format_parquet_enable_json_parsing 1 input_format_parquet_enable_row_group_prefetch 1 input_format_parquet_filter_push_down 1 +input_format_parquet_use_metadata_cache 1 input_format_parquet_import_nested 0 input_format_parquet_local_file_min_bytes_for_seek 8192 input_format_parquet_max_block_size 65409 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference new file mode 100644 index 000000000000..c87ad9008b60 --- /dev/null +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference @@ -0,0 +1,8 @@ +10 +10 +10 +10 +10 +10 +0 +10 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql new file mode 100644 index 000000000000..b7b0501e4875 --- /dev/null +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql @@ -0,0 +1,61 @@ +-- Tags: no-parallel, no-fasttest, no-parallel-replicas + +DROP TABLE IF EXISTS t_parquet_03262; + +CREATE TABLE t_parquet_03262 (a UInt64) +ENGINE = S3(s3_conn, filename = 'test_03262_{_partition_id}', format = Parquet) +PARTITION BY a; + +INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1, use_query_condition_cache=0,optimize_count_from_files=0; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1, use_query_condition_cache=0, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache'; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = ParquetMetadata) +SETTINGS input_format_parquet_use_metadata_cache=1, use_query_condition_cache=0, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_format_metadata_cache'; + +SYSTEM FLUSH LOGS; + +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1 SETTINGS use_query_condition_cache=0; + +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_format_metadata_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1 SETTINGS use_query_condition_cache=0; + +SYSTEM DROP PARQUET METADATA CACHE; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1, use_query_condition_cache=0, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache_cache_empty'; + +SYSTEM FLUSH LOGS; + +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_cache_cache_empty' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1 SETTINGS use_query_condition_cache=0; + +SELECT ProfileEvents['ParquetMetaDataCacheMisses'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_cache_cache_empty' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1 SETTINGS use_query_condition_cache=0; + +DROP TABLE t_parquet_03262;