Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ enum class AccessType : uint8_t
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \
Expand Down
15 changes: 15 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@
#include <Formats/ProtobufSchemas.h>
#endif

#if USE_PARQUET
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
#endif

#if USE_AWS_S3
#include <IO/S3/Client.h>
#endif
Expand Down Expand Up @@ -415,6 +419,16 @@ BlockIO InterpreterSystemQuery::execute()
break;
}

case Type::DROP_PARQUET_METADATA_CACHE:
{
#if USE_PARQUET
getContext()->checkAccess(AccessType::SYSTEM_DROP_PARQUET_METADATA_CACHE);
ParquetFileMetaDataCache::instance()->clear();
break;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: May be throw exception when USE_PARQUET not defined, as for USE_EMBEDDED_COMPILER below?
(I don't think than sometimes someone assemble clickhouse without parquet support)

#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for Parquet");
#endif
}
case Type::DROP_COMPILED_EXPRESSION_CACHE:
#if USE_EMBEDDED_COMPILER
getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE);
Expand Down Expand Up @@ -1445,6 +1459,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_PAGE_CACHE:
case Type::DROP_SCHEMA_CACHE:
case Type::DROP_FORMAT_SCHEMA_CACHE:
case Type::DROP_PARQUET_METADATA_CACHE:
case Type::DROP_S3_CLIENT_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
case Type::DROP_SKIPPING_INDEX_CACHE:
case Type::DROP_COMPILED_EXPRESSION_CACHE:
case Type::DROP_S3_CLIENT_CACHE:
case Type::DROP_PARQUET_METADATA_CACHE:
case Type::RESET_COVERAGE:
case Type::RESTART_REPLICAS:
case Type::JEMALLOC_PURGE:
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
DROP_SCHEMA_CACHE,
DROP_FORMAT_SCHEMA_CACHE,
DROP_S3_CLIENT_CACHE,
DROP_PARQUET_METADATA_CACHE,
STOP_LISTEN,
START_LISTEN,
RESTART_REPLICAS,
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<clickhouse>
<remote_servers>
<parquet_clear_cache_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</parquet_clear_cache_cluster>
</remote_servers>
</clickhouse>
69 changes: 69 additions & 0 deletions tests/integration/test_parquet_drop_metadata_cache/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pytest
from helpers.cluster import ClickHouseCluster
import time

cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", main_configs=["configs/config.d/cluster.xml"], with_zookeeper=True, with_minio=True)
node2 = cluster.add_instance("node2", main_configs=["configs/config.d/cluster.xml"], with_zookeeper=True)
node3 = cluster.add_instance("node3", main_configs=["configs/config.d/cluster.xml"], with_zookeeper=True)


@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()


def test_clear_cache_on_cluster(started_cluster):
node1.query("INSERT INTO TABLE FUNCTION s3('http://minio1:9001/root/data/test_clear_cache/{_partition_id}.parquet', 'minio', 'minio123', 'Parquet') PARTITION BY number SELECT number FROM numbers(1, 3)")

node1.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/1.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='cold_cache'")
node2.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/2.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='cold_cache'")
node3.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/3.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='cold_cache'")

node1.query("SYSTEM FLUSH LOGS ON CLUSTER parquet_clear_cache_cluster")

cold_cache_result_n1 = node1.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cold_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
cold_cache_result_n2 = node2.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cold_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
cold_cache_result_n3 = node3.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cold_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")

assert(cold_cache_result_n1 == cold_cache_result_n2 == cold_cache_result_n3)
assert(cold_cache_result_n1 == '0\n')

node1.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/1.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='hot_cache'")
node2.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/2.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='hot_cache'")
node3.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/3.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='hot_cache'")

node1.query("SYSTEM FLUSH LOGS ON CLUSTER parquet_clear_cache_cluster")

warm_cache_result_n1 = node1.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'hot_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
warm_cache_result_n2 = node2.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'hot_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
warm_cache_result_n3 = node3.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'hot_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")

assert(warm_cache_result_n1 == warm_cache_result_n2 == warm_cache_result_n3)
assert(warm_cache_result_n1 == '1\n')

node1.query("SYSTEM DROP PARQUET METADATA CACHE ON CLUSTER parquet_clear_cache_cluster")

node1.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/1.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='cache_after_drop'")
node2.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/2.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='cache_after_drop'")
node3.query("SELECT * FROM s3('http://minio1:9001/root/data/test_clear_cache/3.parquet', 'minio', 'minio123', 'Parquet') SETTINGS log_comment='cache_after_drop'")

node1.query("SYSTEM FLUSH LOGS ON CLUSTER parquet_clear_cache_cluster")

cache_after_drop_result_n1 = node1.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
cache_after_drop_result_n2 = node2.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
cache_after_drop_result_n3 = node3.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")

assert(cache_after_drop_result_n1 == cache_after_drop_result_n2 == cache_after_drop_result_n3)
assert(cache_after_drop_result_n1 == '0\n')

misses_after_drop_result_n1 = node1.query("SELECT ProfileEvents['ParquetMetaDataCacheMisses'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
misses_after_drop_result_n2 = node2.query("SELECT ProfileEvents['ParquetMetaDataCacheMisses'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
misses_after_drop_result_n3 = node3.query("SELECT ProfileEvents['ParquetMetaDataCacheMisses'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")

assert(misses_after_drop_result_n1 == misses_after_drop_result_n2 == misses_after_drop_result_n3)
assert(misses_after_drop_result_n1 == '1\n')
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
10
10
10
10
0
10
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,26 @@ AND type = 'QueryFinish'
ORDER BY event_time desc
LIMIT 1;

SYSTEM DROP PARQUET METADATA CACHE;

SELECT COUNT(*)
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
SETTINGS input_format_parquet_use_metadata_cache=1, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache_cache_empty';

SYSTEM FLUSH LOGS;

SELECT ProfileEvents['ParquetMetaDataCacheHits']
FROM system.query_log
where log_comment = 'test_03262_parquet_metadata_cache_cache_empty'
AND type = 'QueryFinish'
ORDER BY event_time desc
LIMIT 1;

SELECT ProfileEvents['ParquetMetaDataCacheMisses']
FROM system.query_log
where log_comment = 'test_03262_parquet_metadata_cache_cache_empty'
AND type = 'QueryFinish'
ORDER BY event_time desc
LIMIT 1;

DROP TABLE t_parquet_03262;
Loading