diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 77edd1630fc6..2e377689ccfc 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -181,6 +181,7 @@ enum class AccessType : uint8_t M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ + M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \ M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \ diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index fc0f9594d0b9..955cf59d6e8b 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -74,6 +74,10 @@ #include #endif +#if USE_PARQUET +#include +#endif + #if USE_AWS_S3 #include #endif @@ -415,6 +419,16 @@ BlockIO InterpreterSystemQuery::execute() break; } + case Type::DROP_PARQUET_METADATA_CACHE: + { +#if USE_PARQUET + getContext()->checkAccess(AccessType::SYSTEM_DROP_PARQUET_METADATA_CACHE); + ParquetFileMetaDataCache::instance()->clear(); + break; +#else + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for Parquet"); +#endif + } case Type::DROP_COMPILED_EXPRESSION_CACHE: #if USE_EMBEDDED_COMPILER getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE); @@ -1445,6 +1459,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() case Type::DROP_PAGE_CACHE: case Type::DROP_SCHEMA_CACHE: case Type::DROP_FORMAT_SCHEMA_CACHE: + case Type::DROP_PARQUET_METADATA_CACHE: case Type::DROP_S3_CLIENT_CACHE: { required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE); diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index 43a100fc57ad..9335766e13b6 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -431,6 +431,7 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti case Type::DROP_SKIPPING_INDEX_CACHE: case Type::DROP_COMPILED_EXPRESSION_CACHE: case Type::DROP_S3_CLIENT_CACHE: + case Type::DROP_PARQUET_METADATA_CACHE: case Type::RESET_COVERAGE: case Type::RESTART_REPLICAS: case Type::JEMALLOC_PURGE: diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index fb580bf48194..521bb703f467 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -40,6 +40,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster DROP_SCHEMA_CACHE, DROP_FORMAT_SCHEMA_CACHE, DROP_S3_CLIENT_CACHE, + DROP_PARQUET_METADATA_CACHE, STOP_LISTEN, START_LISTEN, RESTART_REPLICAS, diff --git a/tests/integration/test_parquet_drop_metadata_cache/__init__.py b/tests/integration/test_parquet_drop_metadata_cache/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_parquet_drop_metadata_cache/configs/config.d/cluster.xml b/tests/integration/test_parquet_drop_metadata_cache/configs/config.d/cluster.xml new file mode 100644 index 000000000000..1388c0788fc0 --- /dev/null +++ b/tests/integration/test_parquet_drop_metadata_cache/configs/config.d/cluster.xml @@ -0,0 +1,20 @@ + + + + + + node1 + 9000 + + + node2 + 9000 + + + node3 + 9000 + + + + + \ No newline at end of file diff --git a/tests/integration/test_parquet_drop_metadata_cache/test.py b/tests/integration/test_parquet_drop_metadata_cache/test.py new file mode 100644 index 000000000000..03db9c05b081 --- /dev/null +++ b/tests/integration/test_parquet_drop_metadata_cache/test.py @@ -0,0 +1,69 @@ +import pytest +from helpers.cluster import ClickHouseCluster +import time + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance("node1", main_configs=["configs/config.d/cluster.xml"], with_zookeeper=True, with_minio=True) +node2 = cluster.add_instance("node2", main_configs=["configs/config.d/cluster.xml"], with_zookeeper=True) +node3 = cluster.add_instance("node3", main_configs=["configs/config.d/cluster.xml"], with_zookeeper=True) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def test_clear_cache_on_cluster(started_cluster): + node1.query("INSERT INTO TABLE FUNCTION s3('http://minio1:9001/root/data/test_clear_cache/{_partition_id}.parquet', 'minio', 'minio123', 'Parquet') PARTITION BY number SELECT number FROM numbers(1, 3)") + + node1.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/1.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='cold_cache'") + node2.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/2.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='cold_cache'") + node3.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/3.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='cold_cache'") + + node1.query("SYSTEM FLUSH LOGS ON CLUSTER parquet_clear_cache_cluster") + + cold_cache_result_n1 = node1.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cold_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;") + cold_cache_result_n2 = node2.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cold_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;") + cold_cache_result_n3 = node3.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cold_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;") + + assert(cold_cache_result_n1 == cold_cache_result_n2 == cold_cache_result_n3) + assert(cold_cache_result_n1 == '0\n') + + node1.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/1.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='hot_cache'") + node2.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/2.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='hot_cache'") + node3.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/3.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='hot_cache'") + + node1.query("SYSTEM FLUSH LOGS ON CLUSTER parquet_clear_cache_cluster") + + warm_cache_result_n1 = node1.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'hot_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;") + warm_cache_result_n2 = node2.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'hot_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;") + warm_cache_result_n3 = node3.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'hot_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;") + + assert(warm_cache_result_n1 == warm_cache_result_n2 == warm_cache_result_n3) + assert(warm_cache_result_n1 == '1\n') + + node1.query("SYSTEM DROP PARQUET METADATA CACHE ON CLUSTER parquet_clear_cache_cluster") + + node1.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/1.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='cache_after_drop'") + node2.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/2.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='cache_after_drop'") + node3.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/3.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='cache_after_drop'") + + node1.query("SYSTEM FLUSH LOGS ON CLUSTER parquet_clear_cache_cluster") + + cache_after_drop_result_n1 = node1.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;") + cache_after_drop_result_n2 = node2.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;") + cache_after_drop_result_n3 = node3.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;") + + assert(cache_after_drop_result_n1 == cache_after_drop_result_n2 == cache_after_drop_result_n3) + assert(cache_after_drop_result_n1 == '0\n') + + misses_after_drop_result_n1 = node1.query("SELECT ProfileEvents['ParquetMetaDataCacheMisses'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;") + misses_after_drop_result_n2 = node2.query("SELECT ProfileEvents['ParquetMetaDataCacheMisses'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;") + misses_after_drop_result_n3 = node3.query("SELECT ProfileEvents['ParquetMetaDataCacheMisses'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;") + + assert(misses_after_drop_result_n1 == misses_after_drop_result_n2 == misses_after_drop_result_n3) + assert(misses_after_drop_result_n1 == '1\n') diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference index f5c1b1de44a4..c87ad9008b60 100644 --- a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference @@ -3,3 +3,6 @@ 10 10 10 +10 +0 +10 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql index 6153ad30b332..2a1934e7c963 100644 --- a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql @@ -36,4 +36,26 @@ AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1; +SYSTEM DROP PARQUET METADATA CACHE; + +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) +SETTINGS input_format_parquet_use_metadata_cache=1, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache_cache_empty'; + +SYSTEM FLUSH LOGS; + +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_cache_cache_empty' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + +SELECT ProfileEvents['ParquetMetaDataCacheMisses'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_cache_cache_empty' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + DROP TABLE t_parquet_03262;