From 3ac5403ca5bc1845dac177054034f6bf17d5f34e Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Wed, 5 Feb 2025 22:47:48 +0100 Subject: [PATCH 01/11] Merge pull request #586 from Altinity/metadata_cache_for_parquet_24_12_2 Parquet File Metadata caching implementation --- src/Common/ProfileEvents.cpp | 6 +- src/Core/FormatFactorySettings.h | 2 +- src/Core/ServerSettings.cpp | 5 +- src/Processors/Formats/IInputFormat.h | 3 + .../Formats/Impl/ParquetBlockInputFormat.cpp | 85 ++++++++++++++++++- .../Formats/Impl/ParquetBlockInputFormat.h | 24 ++++++ .../StorageObjectStorageSource.cpp | 3 + ...et_object_storage_metadata_cache.reference | 3 + ..._parquet_object_storage_metadata_cache.sql | 28 ++++++ 9 files changed, 152 insertions(+), 7 deletions(-) create mode 100644 tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference create mode 100644 tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 809736ceb36e..1bdcec97d1cf 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -953,8 +953,10 @@ The server successfully detected this situation and will download merged part fr M(MemoryWorkerRun, "Number of runs done by MemoryWorker in background", ValueType::Number) \ M(MemoryWorkerRunElapsedMicroseconds, "Total time spent by MemoryWorker for background work", ValueType::Microseconds) \ \ - M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \ - + M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \ + \ + M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \ + M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \ #ifdef APPLY_FOR_EXTERNAL_EVENTS #define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M) diff --git a/src/Core/FormatFactorySettings.h b/src/Core/FormatFactorySettings.h index f5f2482b2e50..0241733b2fc2 100644 --- a/src/Core/FormatFactorySettings.h +++ b/src/Core/FormatFactorySettings.h @@ -1313,7 +1313,7 @@ Set the quoting rule for identifiers in SHOW CREATE query DECLARE(IdentifierQuotingStyle, show_create_query_identifier_quoting_style, IdentifierQuotingStyle::Backticks, R"( Set the quoting style for identifiers in SHOW CREATE query )", 0) \ - + DECLARE(Bool, input_format_parquet_use_metadata_cache, false, R"(Enable parquet file metadata caching)", 0) \ // End of FORMAT_FACTORY_SETTINGS #define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \ diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index e3923a37ea93..f66c3e588f5f 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -999,9 +999,8 @@ namespace DB true ``` )", 0) \ - DECLARE(Bool, storage_shared_set_join_use_inner_uuid, false, "If enabled, an inner UUID is generated during the creation of SharedSet and SharedJoin. ClickHouse Cloud only", 0) - - + DECLARE(Bool, storage_shared_set_join_use_inner_uuid, false, "If enabled, an inner UUID is generated during the creation of SharedSet and SharedJoin. ClickHouse Cloud only", 0) \ + DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \ // clang-format on /// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index e4f9fc318f55..900d4edc39f1 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -67,6 +67,9 @@ class IInputFormat : public SourceWithKeyCondition void needOnlyCount() { need_only_count = true; } + /// Set additional info/key/id related to underlying storage of the ReadBuffer + virtual void setStorageRelatedUniqueKey(const ServerSettings & /* server_settings */, const Settings & /*settings*/, const String & /*key*/) {} + protected: ReadBuffer & getReadBuffer() const { chassert(in); return *in; } diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 927387227e24..d7bcce95a2fa 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -3,6 +3,9 @@ #if USE_PARQUET #include +#include +#include +#include #include #include #include @@ -35,6 +38,8 @@ namespace ProfileEvents { extern const Event ParquetFetchWaitTimeMicroseconds; + extern const Event ParquetMetaDataCacheHits; + extern const Event ParquetMetaDataCacheMisses; } namespace CurrentMetrics @@ -51,6 +56,16 @@ namespace CurrentMetrics namespace DB { +namespace Setting +{ + extern const SettingsBool input_format_parquet_use_metadata_cache; +} + +namespace ServerSetting +{ + extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size; +} + namespace ErrorCodes { extern const int BAD_ARGUMENTS; @@ -510,6 +525,58 @@ static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaDa return hyperrectangle; } +ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_size_bytes) + : CacheBase(max_size_bytes) {} + +ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_size_bytes) +{ + static ParquetFileMetaDataCache instance(max_size_bytes); + return &instance; +} + +std::shared_ptr ParquetBlockInputFormat::readMetadataFromFile() +{ + createArrowFileIfNotCreated(); + return parquet::ReadMetaData(arrow_file); +} + +std::shared_ptr ParquetBlockInputFormat::getFileMetaData() +{ + // in-memory cache is not implemented for local file operations, only for remote files + // there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation + // and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key + if (!metadata_cache.use_cache || metadata_cache.key.empty()) + { + return readMetadataFromFile(); + } + + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_size_bytes)->getOrSet( + metadata_cache.key, + [&]() + { + return readMetadataFromFile(); + } + ); + if (loaded) + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); + else + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits); + return parquet_file_metadata; +} + +void ParquetBlockInputFormat::createArrowFileIfNotCreated() +{ + if (arrow_file) + { + return; + } + + // Create arrow file adapter. + // TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that + // we'll need to read (which we know in advance). Use max_download_threads for that. + arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); +} + std::unordered_set getBloomFilterFilteringColumnKeys(const KeyCondition::RPN & rpn) { std::unordered_set column_keys; @@ -609,7 +676,7 @@ void ParquetBlockInputFormat::initializeIfNeeded() if (is_stopped) return; - metadata = parquet::ReadMetaData(arrow_file); + metadata = getFileMetaData(); const bool prefetch_group = supportPrefetch(); std::shared_ptr schema; @@ -709,6 +776,8 @@ void ParquetBlockInputFormat::initializeIfNeeded() } } + bool has_row_groups_to_read = false; + auto skip_row_group_based_on_filters = [&](int row_group) { if (!format_settings.parquet.filter_push_down && !format_settings.parquet.bloom_filter_push_down) @@ -758,9 +827,23 @@ void ParquetBlockInputFormat::initializeIfNeeded() row_group_batches.back().total_bytes_compressed += row_group_size; auto rows = adaptive_chunk_size(row_group); row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size; + + has_row_groups_to_read = true; + } + + if (has_row_groups_to_read) + { + createArrowFileIfNotCreated(); } } +void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) +{ + metadata_cache.key = key_; + metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; + metadata_cache.max_size_bytes = server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]; +} + void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx) { const bool row_group_prefetch = supportPrefetch(); diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index f941bb70ebce..d9a8d82e5cd6 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -2,6 +2,7 @@ #include "config.h" #if USE_PARQUET +#include #include #include #include @@ -72,6 +73,8 @@ class ParquetBlockInputFormat : public IInputFormat size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; } + void setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) override; + private: Chunk read() override; @@ -90,6 +93,11 @@ class ParquetBlockInputFormat : public IInputFormat void threadFunction(size_t row_group_batch_idx); + void createArrowFileIfNotCreated(); + std::shared_ptr readMetadataFromFile(); + + std::shared_ptr getFileMetaData(); + inline bool supportPrefetch() const; // Data layout in the file: @@ -338,6 +346,12 @@ class ParquetBlockInputFormat : public IInputFormat std::exception_ptr background_exception = nullptr; std::atomic is_stopped{0}; bool is_initialized = false; + struct Cache + { + String key; + bool use_cache = false; + UInt64 max_size_bytes{0}; + } metadata_cache; }; class ParquetSchemaReader : public ISchemaReader @@ -356,6 +370,16 @@ class ParquetSchemaReader : public ISchemaReader std::shared_ptr metadata; }; +class ParquetFileMetaDataCache : public CacheBase +{ +public: + static ParquetFileMetaDataCache * instance(UInt64 max_size_bytes); + void clear() {} + +private: + ParquetFileMetaDataCache(UInt64 max_size_bytes); +}; + } #endif diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index c779a3cd7622..826081a6b032 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -477,6 +477,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (need_only_count) input_format->needOnlyCount(); + if (!object_info->getPath().empty()) + input_format->setStorageRelatedUniqueKey(context_->getServerSettings(), context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag); + builder.init(Pipe(input_format)); if (auto transformer = configuration->getSchemaTransformer(object_info->getPath())) diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference new file mode 100644 index 000000000000..51fdf048b8ac --- /dev/null +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference @@ -0,0 +1,3 @@ +10 +10 +10 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql new file mode 100644 index 000000000000..f453dcba0c66 --- /dev/null +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql @@ -0,0 +1,28 @@ +-- Tags: no-parallel, no-fasttest + +DROP TABLE IF EXISTS t_parquet_03262; + +CREATE TABLE t_parquet_03262 (a UInt64) +ENGINE = S3(s3_conn, filename = 'test_03262_{_partition_id}', format = Parquet) +PARTITION BY a; + +INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1, log_comment='test_03262_parquet_metadata_cache'; + +SYSTEM FLUSH LOGS; + +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +DROP TABLE t_parquet_03262; From 93ca792af892dc55eb9bf91639ddb04683c505d7 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 27 Feb 2025 11:17:45 -0300 Subject: [PATCH 02/11] fix flaky test --- .../03299_parquet_object_storage_metadata_cache.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql index f453dcba0c66..6882acac2ae7 100644 --- a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql @@ -10,11 +10,11 @@ INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_ SELECT COUNT(*) FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) -SETTINGS input_format_parquet_use_metadata_cache=1; +SETTINGS input_format_parquet_use_metadata_cache=1, optimize_count_from_files=0; SELECT COUNT(*) FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) -SETTINGS input_format_parquet_use_metadata_cache=1, log_comment='test_03262_parquet_metadata_cache'; +SETTINGS input_format_parquet_use_metadata_cache=1, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache'; SYSTEM FLUSH LOGS; From 000d75e00058d06ffeeadee4ccc08784766880c8 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 18 Feb 2025 15:52:39 -0300 Subject: [PATCH 03/11] use parquet metadata cache for parquetmetadata format as well --- programs/server/CMakeLists.txt | 4 ++ programs/server/Server.cpp | 6 +++ src/Processors/Formats/IInputFormat.h | 2 +- .../Formats/Impl/ParquetBlockInputFormat.cpp | 15 ++---- .../Formats/Impl/ParquetBlockInputFormat.h | 18 ++----- .../Formats/Impl/ParquetFileMetaDataCache.cpp | 20 +++++++ .../Formats/Impl/ParquetFileMetaDataCache.h | 24 +++++++++ .../Impl/ParquetMetadataInputFormat.cpp | 53 +++++++++++++++++-- .../Formats/Impl/ParquetMetadataInputFormat.h | 10 ++++ .../StorageObjectStorageSource.cpp | 2 +- 10 files changed, 122 insertions(+), 32 deletions(-) create mode 100644 src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp create mode 100644 src/Processors/Formats/Impl/ParquetFileMetaDataCache.h diff --git a/programs/server/CMakeLists.txt b/programs/server/CMakeLists.txt index d3565211d141..b05068d58115 100644 --- a/programs/server/CMakeLists.txt +++ b/programs/server/CMakeLists.txt @@ -25,6 +25,10 @@ if (TARGET ch_contrib::azure_sdk) list(APPEND CLICKHOUSE_SERVER_LINK PRIVATE ch_contrib::azure_sdk) endif() +if (TARGET ch_contrib::parquet) + list(APPEND CLICKHOUSE_SERVER_LINK PRIVATE ch_contrib::parquet) +endif () + clickhouse_program_add(server) install(FILES config.xml users.xml DESTINATION "${CLICKHOUSE_ETC_DIR}/clickhouse-server" COMPONENT clickhouse) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index ec830435a3d2..c41a8d9a3ff6 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -154,6 +154,10 @@ # include #endif +#if USE_PARQUET +# include +#endif + #include /// A minimal file used when the server is run without installation @@ -309,6 +313,7 @@ namespace ServerSetting extern const ServerSettingsUInt64 max_prefixes_deserialization_thread_pool_size; extern const ServerSettingsUInt64 max_prefixes_deserialization_thread_pool_free_size; extern const ServerSettingsUInt64 prefixes_deserialization_thread_pool_thread_pool_queue_size; + extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size; } } @@ -2306,6 +2311,7 @@ try dns_cache_updater->start(); auto replicas_reconnector = ReplicasReconnector::init(global_context); + ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]); /// Set current database name before loading tables and databases because /// system logs may copy global context. diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index 900d4edc39f1..d4416f80ba17 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -68,7 +68,7 @@ class IInputFormat : public SourceWithKeyCondition void needOnlyCount() { need_only_count = true; } /// Set additional info/key/id related to underlying storage of the ReadBuffer - virtual void setStorageRelatedUniqueKey(const ServerSettings & /* server_settings */, const Settings & /*settings*/, const String & /*key*/) {} + virtual void setStorageRelatedUniqueKey(const Settings & /*settings*/, const String & /*key*/) {} protected: ReadBuffer & getReadBuffer() const { chassert(in); return *in; } diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index d7bcce95a2fa..34660de2d401 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -525,15 +526,6 @@ static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaDa return hyperrectangle; } -ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_size_bytes) - : CacheBase(max_size_bytes) {} - -ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_size_bytes) -{ - static ParquetFileMetaDataCache instance(max_size_bytes); - return &instance; -} - std::shared_ptr ParquetBlockInputFormat::readMetadataFromFile() { createArrowFileIfNotCreated(); @@ -550,7 +542,7 @@ std::shared_ptr ParquetBlockInputFormat::getFileMetaData( return readMetadataFromFile(); } - auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_size_bytes)->getOrSet( + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet( metadata_cache.key, [&]() { @@ -837,11 +829,10 @@ void ParquetBlockInputFormat::initializeIfNeeded() } } -void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) +void ParquetBlockInputFormat::setStorageRelatedUniqueKey( const Settings & settings, const String & key_) { metadata_cache.key = key_; metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; - metadata_cache.max_size_bytes = server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]; } void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx) diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index d9a8d82e5cd6..a1b7eade4303 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -2,7 +2,6 @@ #include "config.h" #if USE_PARQUET -#include #include #include #include @@ -73,7 +72,7 @@ class ParquetBlockInputFormat : public IInputFormat size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; } - void setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) override; + void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; private: Chunk read() override; @@ -350,8 +349,9 @@ class ParquetBlockInputFormat : public IInputFormat { String key; bool use_cache = false; - UInt64 max_size_bytes{0}; - } metadata_cache; + }; + + Cache metadata_cache; }; class ParquetSchemaReader : public ISchemaReader @@ -370,16 +370,6 @@ class ParquetSchemaReader : public ISchemaReader std::shared_ptr metadata; }; -class ParquetFileMetaDataCache : public CacheBase -{ -public: - static ParquetFileMetaDataCache * instance(UInt64 max_size_bytes); - void clear() {} - -private: - ParquetFileMetaDataCache(UInt64 max_size_bytes); -}; - } #endif diff --git a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp new file mode 100644 index 000000000000..85808f90543b --- /dev/null +++ b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp @@ -0,0 +1,20 @@ +#include + +#ifdef USE_PARQUET + +namespace DB +{ + +ParquetFileMetaDataCache::ParquetFileMetaDataCache() + : CacheBase(0) +{} + +ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance() +{ + static ParquetFileMetaDataCache instance; + return &instance; +} + +} + +#endif diff --git a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h new file mode 100644 index 000000000000..5cf1e07ed281 --- /dev/null +++ b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h @@ -0,0 +1,24 @@ +#pragma once + +#include "config.h" + +#if USE_PARQUET + +#include +#include + +namespace DB +{ + +class ParquetFileMetaDataCache : public CacheBase +{ +public: + static ParquetFileMetaDataCache * instance(); + +private: + ParquetFileMetaDataCache(); +}; + +} + +#endif diff --git a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp index 428e8c928913..792cee937db2 100644 --- a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp @@ -22,8 +22,17 @@ #include #include "ArrowBufferedStreams.h" #include +#include +#include +#include +namespace ProfileEvents +{ +extern const Event ParquetMetaDataCacheHits; +extern const Event ParquetMetaDataCacheMisses; +} + namespace DB { @@ -32,6 +41,11 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +namespace Setting +{ +extern const SettingsBool input_format_parquet_use_metadata_cache; +} + static NamesAndTypesList getHeaderForParquetMetadata() { NamesAndTypesList names_and_types{ @@ -130,10 +144,35 @@ void checkHeader(const Block & header) static std::shared_ptr getFileMetadata( ReadBuffer & in, const FormatSettings & format_settings, - std::atomic & is_stopped) + std::atomic & is_stopped, + ParquetMetadataInputFormat::Cache metadata_cache) { - auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); - return parquet::ReadMetaData(arrow_file); + // in-memory cache is not implemented for local file operations, only for remote files + // there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation + // and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key + if (!metadata_cache.use_cache || metadata_cache.key.empty()) + { + auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); + return parquet::ReadMetaData(arrow_file); + } + + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet( + metadata_cache.key, + [&]() + { + auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); + return parquet::ReadMetaData(arrow_file); + } + ); + + if (loaded) + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); + else + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits); + + return parquet_file_metadata; + + } ParquetMetadataInputFormat::ParquetMetadataInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_) @@ -148,7 +187,7 @@ Chunk ParquetMetadataInputFormat::read() if (done) return res; - auto metadata = getFileMetadata(*in, format_settings, is_stopped); + auto metadata = getFileMetadata(*in, format_settings, is_stopped, metadata_cache); const auto & header = getPort().getHeader(); auto names_and_types = getHeaderForParquetMetadata(); @@ -488,6 +527,12 @@ void ParquetMetadataInputFormat::resetParser() done = false; } +void ParquetMetadataInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_) +{ + metadata_cache.key = key_; + metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; +} + ParquetMetadataSchemaReader::ParquetMetadataSchemaReader(ReadBuffer & in_) : ISchemaReader(in_) { diff --git a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h index 5d2d89898596..5cd3f4173bf5 100644 --- a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h @@ -62,6 +62,14 @@ class ParquetMetadataInputFormat : public IInputFormat void resetParser() override; + void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; + + struct Cache + { + String key; + bool use_cache = false; + }; + private: Chunk read() override; @@ -78,6 +86,8 @@ class ParquetMetadataInputFormat : public IInputFormat const FormatSettings format_settings; bool done = false; std::atomic is_stopped{0}; + + Cache metadata_cache; }; class ParquetMetadataSchemaReader : public ISchemaReader diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 826081a6b032..03dbe681bf7b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -478,7 +478,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade input_format->needOnlyCount(); if (!object_info->getPath().empty()) - input_format->setStorageRelatedUniqueKey(context_->getServerSettings(), context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag); + input_format->setStorageRelatedUniqueKey(context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag); builder.init(Pipe(input_format)); From 04399bc28c744b22a2503efece53662fe35aea69 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 18 Feb 2025 16:17:15 -0300 Subject: [PATCH 04/11] add test, see what CICD says --- .../03299_parquet_object_storage_metadata_cache.reference | 1 + .../03299_parquet_object_storage_metadata_cache.sql | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference index 51fdf048b8ac..6623d42f4b49 100644 --- a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference @@ -1,3 +1,4 @@ 10 10 10 +20 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql index 6882acac2ae7..eb07a8bfceb1 100644 --- a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql @@ -16,6 +16,10 @@ SELECT COUNT(*) FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) SETTINGS input_format_parquet_use_metadata_cache=1, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache'; +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = ParquetMetadata) +SETTINGS input_format_parquet_use_metadata_cache=1, log_comment='test_03262_parquet_metadata_format_metadata_cache'; + SYSTEM FLUSH LOGS; SELECT ProfileEvents['ParquetMetaDataCacheHits'] From cbc5c5f7a9af5ae9e67426e10978f6421ffdcd14 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 18 Feb 2025 17:35:55 -0300 Subject: [PATCH 05/11] simplify dependencies by forward declaring parquet structure --- programs/server/CMakeLists.txt | 4 ---- src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp | 2 +- src/Processors/Formats/Impl/ParquetFileMetaDataCache.h | 8 +++++++- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/programs/server/CMakeLists.txt b/programs/server/CMakeLists.txt index b05068d58115..d3565211d141 100644 --- a/programs/server/CMakeLists.txt +++ b/programs/server/CMakeLists.txt @@ -25,10 +25,6 @@ if (TARGET ch_contrib::azure_sdk) list(APPEND CLICKHOUSE_SERVER_LINK PRIVATE ch_contrib::azure_sdk) endif() -if (TARGET ch_contrib::parquet) - list(APPEND CLICKHOUSE_SERVER_LINK PRIVATE ch_contrib::parquet) -endif () - clickhouse_program_add(server) install(FILES config.xml users.xml DESTINATION "${CLICKHOUSE_ETC_DIR}/clickhouse-server" COMPONENT clickhouse) diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 34660de2d401..dd30a586a58b 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -829,7 +829,7 @@ void ParquetBlockInputFormat::initializeIfNeeded() } } -void ParquetBlockInputFormat::setStorageRelatedUniqueKey( const Settings & settings, const String & key_) +void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_) { metadata_cache.key = key_; metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; diff --git a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h index 5cf1e07ed281..fb5fc1bb0217 100644 --- a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h +++ b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h @@ -4,7 +4,13 @@ #if USE_PARQUET -#include +namespace parquet +{ + +class FileMetaData; + +} + #include namespace DB From a649737065205f651a43e41850501735f1bb87c8 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 19 Feb 2025 09:23:02 -0300 Subject: [PATCH 06/11] fix tests --- .../03299_parquet_object_storage_metadata_cache.reference | 3 ++- .../03299_parquet_object_storage_metadata_cache.sql | 7 +++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference index 6623d42f4b49..f5c1b1de44a4 100644 --- a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference @@ -1,4 +1,5 @@ 10 10 10 -20 +10 +10 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql index eb07a8bfceb1..6153ad30b332 100644 --- a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql @@ -29,4 +29,11 @@ AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1; +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_format_metadata_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + DROP TABLE t_parquet_03262; From 0914a057847a52fc4156d3f76e4434134ccffe4f Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 6 Mar 2025 20:05:38 -0300 Subject: [PATCH 07/11] Metadata cache on by default part 2 --- src/Core/FormatFactorySettings.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Core/FormatFactorySettings.h b/src/Core/FormatFactorySettings.h index 0241733b2fc2..b4579c5fae64 100644 --- a/src/Core/FormatFactorySettings.h +++ b/src/Core/FormatFactorySettings.h @@ -1313,7 +1313,7 @@ Set the quoting rule for identifiers in SHOW CREATE query DECLARE(IdentifierQuotingStyle, show_create_query_identifier_quoting_style, IdentifierQuotingStyle::Backticks, R"( Set the quoting style for identifiers in SHOW CREATE query )", 0) \ - DECLARE(Bool, input_format_parquet_use_metadata_cache, false, R"(Enable parquet file metadata caching)", 0) \ + DECLARE(Bool, input_format_parquet_use_metadata_cache, true, R"(Enable parquet file metadata caching)", 0) \ // End of FORMAT_FACTORY_SETTINGS #define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \ From 707d2a483ba719af7c045832e1888ba9c597287b Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 6 Mar 2025 20:34:59 -0300 Subject: [PATCH 08/11] Update SettingsChangesHistory.cpp --- src/Core/SettingsChangesHistory.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index dc6809ffa283..c6fc11c15a61 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -149,6 +149,12 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() /// Release closed. Please use 25.1 }); addSettingsChanges(settings_changes_history, "24.11", + {"object_storage_cluster", "", "", "New setting"}, + {"object_storage_max_nodes", 0, 0, "New setting"}, + {"input_format_parquet_use_metadata_cache", 0, 1, "Optimization by default"}, + } + }, + {"24.11", { {"validate_mutation_query", false, true, "New setting to validate mutation queries by default."}, {"enable_job_stack_trace", false, true, "Enable by default collecting stack traces from job's scheduling."}, From 4877cd114098f11ba51829957c514d9eef505afe Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Fri, 7 Mar 2025 00:41:56 +0100 Subject: [PATCH 09/11] Update SettingsChangesHistory.cpp --- src/Core/SettingsChangesHistory.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index c6fc11c15a61..f7f6725f0040 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -83,6 +83,14 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"s3_allow_multipart_copy", true, true, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.1", + {"24.12.2.20000", + // Altinity Antalya modifications + { + {"input_format_parquet_use_metadata_cache", false, false, "New setting"}, // https://github.com/Altinity/ClickHouse/pull/586 + {"input_format_parquet_use_metadata_cache", false, true, "Turn optimization ON by default"}, + } + }, + {"24.12", { /// Release closed. Please use 25.2 {"allow_not_comparable_types_in_order_by", true, false, "Don't allow not comparable types in order by by default"}, @@ -151,7 +159,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() addSettingsChanges(settings_changes_history, "24.11", {"object_storage_cluster", "", "", "New setting"}, {"object_storage_max_nodes", 0, 0, "New setting"}, - {"input_format_parquet_use_metadata_cache", 0, 1, "Optimization by default"}, } }, {"24.11", From 89196b71ca3fc58f98dd8f88a11dfdc743605b2b Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Fri, 7 Mar 2025 00:42:55 +0100 Subject: [PATCH 10/11] Update SettingsChangesHistory.cpp --- src/Core/SettingsChangesHistory.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index f7f6725f0040..fa30249a9319 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -86,8 +86,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"24.12.2.20000", // Altinity Antalya modifications { - {"input_format_parquet_use_metadata_cache", false, false, "New setting"}, // https://github.com/Altinity/ClickHouse/pull/586 - {"input_format_parquet_use_metadata_cache", false, true, "Turn optimization ON by default"}, + {"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, // https://github.com/Altinity/ClickHouse/pull/586 } }, {"24.12", From 01e4f46b94c97e3788428213b741cb0bebd819c1 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Sun, 6 Apr 2025 11:06:18 -0300 Subject: [PATCH 11/11] rebase --- src/Core/ServerSettings.cpp | 1 - src/Core/SettingsChangesHistory.cpp | 7 ------- src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp | 5 ----- 3 files changed, 13 deletions(-) diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index b406776eb7aa..2b09caed76cb 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -1002,7 +1002,6 @@ namespace DB DECLARE(Bool, storage_shared_set_join_use_inner_uuid, false, "If enabled, an inner UUID is generated during the creation of SharedSet and SharedJoin. ClickHouse Cloud only", 0) \ DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \ - // clang-format on /// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 2170620800a0..cd431ae934db 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -88,13 +88,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"s3_allow_multipart_copy", true, true, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.1", - {"24.12.2.20000", - // Altinity Antalya modifications - { - {"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, // https://github.com/Altinity/ClickHouse/pull/586 - } - }, - {"24.12", { /// Release closed. Please use 25.2 {"allow_not_comparable_types_in_order_by", true, false, "Don't allow not comparable types in order by by default"}, diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 060835184abb..1ca7bba909b5 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -6,9 +6,6 @@ #include #include #include -#include -#include -#include #include #include #include @@ -771,8 +768,6 @@ void ParquetBlockInputFormat::initializeIfNeeded() } } - bool has_row_groups_to_read = false; - auto skip_row_group_based_on_filters = [&](int row_group) { if (!format_settings.parquet.filter_push_down && !format_settings.parquet.bloom_filter_push_down)