Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ public class Config extends ConfigBase {
"The threshold of slow query, in milliseconds. "
+ "If the response time of a query exceeds this threshold, it will be recorded in audit log."})
public static long qe_slow_log_ms = 5000;
@ConfField(mutable = true, description = {"sql_digest 生成的时间阈值,单位为毫秒。如果一个查询的响应时间超过这个阈值,"
+ "则会为其生成 sql_digest。",
"The threshold of sql_digest generation, in milliseconds. "
+ "If the response time of a query exceeds this threshold, "
+ "sql_digest will be generated for it."})
public static long sql_digest_generation_threshold_ms = 5000;
@ConfField(description = {"FE 审计日志文件的切分周期", "The split cycle of the FE audit log file"},
options = {"DAY", "HOUR"})
public static String audit_log_roll_interval = "DAY";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,11 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme
MetricRepo.updateClusterQueryLatency(physicalClusterName, elapseMs);
}
if (elapseMs > Config.qe_slow_log_ms) {
MetricRepo.COUNTER_QUERY_SLOW.increase(1L);
}
if (elapseMs > Config.sql_digest_generation_threshold_ms) {
String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest());
auditEventBuilder.setSqlDigest(sqlDigest);
MetricRepo.COUNTER_QUERY_SLOW.increase(1L);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

suite("test_audit_log_behavior") {
suite("test_audit_log_behavior","nonConcurrent") {
try {
sql "set global enable_audit_plugin = true"
sql "set global audit_plugin_max_sql_length = 58"
Expand Down
83 changes: 83 additions & 0 deletions regression-test/suites/audit/test_sql_digest_generation.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_sql_digest_generation", "nonConcurrent") {
sql "set global enable_audit_plugin = true"

sql "drop table if exists audit_log_behavior"
sql """
CREATE TABLE `audit_log_behavior` (
`id` bigint,
`name` varchar(32)
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
)
"""

sql "insert into audit_log_behavior values (1, 'test')"
sql "insert into audit_log_behavior values (2, 'test')"

setFeConfigTemporary([sql_digest_generation_threshold_ms:0]) {
Thread.sleep(10000)
sql """call flush_audit_log()"""

def prevSqlDigestCountResult = sql """SELECT COUNT(*) FROM __internal_schema.audit_log WHERE sql_digest != '';"""
Long prevSqlDigestCount = Long.valueOf(prevSqlDigestCountResult[0][0])
logger.info("prev sql_digest count: " + prevSqlDigestCount)

sql "select * from audit_log_behavior"

Thread.sleep(10000)
sql """call flush_audit_log()"""

def nowSqlDigestCountResult = sql """SELECT COUNT(*) FROM __internal_schema.audit_log WHERE sql_digest != '';"""
Long nowSqlDigestCount = Long.valueOf(nowSqlDigestCountResult[0][0])
logger.info("now sql_digest count: " + nowSqlDigestCount)

assertTrue(nowSqlDigestCount > prevSqlDigestCount, "Count of sql_digest did not increase")
}

Thread.sleep(10000)
sql """call flush_audit_log()"""

setFeConfigTemporary([sql_digest_generation_threshold_ms:100000]) {
sql """call flush_audit_log()"""
Thread.sleep(10000)

def prevSqlDigestCountResult = sql """SELECT COUNT(*) FROM __internal_schema.audit_log WHERE sql_digest != '';"""
Long prevSqlDigestCount = Long.valueOf(prevSqlDigestCountResult[0][0])
logger.info("prev sql_digest count: " + prevSqlDigestCount)

sql "select * from audit_log_behavior"

Thread.sleep(10000)
sql """call flush_audit_log()"""

def nowSqlDigestCountResult = sql """SELECT COUNT(*) FROM __internal_schema.audit_log WHERE sql_digest != '';"""
Long nowSqlDigestCount = Long.valueOf(nowSqlDigestCountResult[0][0])
logger.info("now sql_digest count: " + nowSqlDigestCount)

assertTrue(nowSqlDigestCount == prevSqlDigestCount, "Count of sql_digest changed")
}

Thread.sleep(10000)
sql """call flush_audit_log()"""
}
Loading