From 8b9b89bed982d3a485774363b96d27febc8ce880 Mon Sep 17 00:00:00 2001 From: xuchenhao <48084123+xuchenhao@users.noreply.github.com> Date: Wed, 17 Dec 2025 17:40:42 +0800 Subject: [PATCH] [feature](audit) add dynamic configuration for sql_digest generation (#59102) To analyze SQL patterns across the entire Doris cluster using sql_digest and implement workload optimization based on SQL patterns, a more flexible sql_digest generation mechanism is required. Previously, sql_digest generation was implicitly tied to the `qe_slow_log_ms` configuration - digests were only generated for queries considered "slow". This coupling limited flexibility in audit logging. This PR introduces a new dynamic configuration `sql_digest_generation_threshold_ms` to optimize the generation of sql_digest in audit logging. ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test - [x] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [x] No. - [ ] Yes. - Does this need documentation? - [x] No. - [ ] Yes. ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --- .../java/org/apache/doris/common/Config.java | 6 ++ .../org/apache/doris/qe/AuditLogHelper.java | 4 +- .../audit/test_audit_log_behavior.groovy | 2 +- .../audit/test_sql_digest_generation.groovy | 83 +++++++++++++++++++ 4 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 regression-test/suites/audit/test_sql_digest_generation.groovy diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index b34dde248c6dd8..ccc512e43ba119 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -120,6 +120,12 @@ public class Config extends ConfigBase { "The threshold of slow query, in milliseconds. " + "If the response time of a query exceeds this threshold, it will be recorded in audit log."}) public static long qe_slow_log_ms = 5000; + @ConfField(mutable = true, description = {"sql_digest 生成的时间阈值,单位为毫秒。如果一个查询的响应时间超过这个阈值," + + "则会为其生成 sql_digest。", + "The threshold of sql_digest generation, in milliseconds. " + + "If the response time of a query exceeds this threshold, " + + "sql_digest will be generated for it."}) + public static long sql_digest_generation_threshold_ms = 5000; @ConfField(description = {"FE 审计日志文件的切分周期", "The split cycle of the FE audit log file"}, options = {"DAY", "HOUR"}) public static String audit_log_roll_interval = "DAY"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index ad6c3bee6d0dae..f43d729e69a919 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -354,9 +354,11 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme MetricRepo.updateClusterQueryLatency(physicalClusterName, elapseMs); } if (elapseMs > Config.qe_slow_log_ms) { + MetricRepo.COUNTER_QUERY_SLOW.increase(1L); + } + if (elapseMs > Config.sql_digest_generation_threshold_ms) { String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest()); auditEventBuilder.setSqlDigest(sqlDigest); - MetricRepo.COUNTER_QUERY_SLOW.increase(1L); } } } diff --git a/regression-test/suites/audit/test_audit_log_behavior.groovy b/regression-test/suites/audit/test_audit_log_behavior.groovy index 8337c249bb30ca..a20e0ef9271f5e 100644 --- a/regression-test/suites/audit/test_audit_log_behavior.groovy +++ b/regression-test/suites/audit/test_audit_log_behavior.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_audit_log_behavior") { +suite("test_audit_log_behavior","nonConcurrent") { try { sql "set global enable_audit_plugin = true" sql "set global audit_plugin_max_sql_length = 58" diff --git a/regression-test/suites/audit/test_sql_digest_generation.groovy b/regression-test/suites/audit/test_sql_digest_generation.groovy new file mode 100644 index 00000000000000..f02f89da20e3e3 --- /dev/null +++ b/regression-test/suites/audit/test_sql_digest_generation.groovy @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_sql_digest_generation", "nonConcurrent") { + sql "set global enable_audit_plugin = true" + + sql "drop table if exists audit_log_behavior" + sql """ + CREATE TABLE `audit_log_behavior` ( + `id` bigint, + `name` varchar(32) + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ) + """ + + sql "insert into audit_log_behavior values (1, 'test')" + sql "insert into audit_log_behavior values (2, 'test')" + + setFeConfigTemporary([sql_digest_generation_threshold_ms:0]) { + Thread.sleep(10000) + sql """call flush_audit_log()""" + + def prevSqlDigestCountResult = sql """SELECT COUNT(*) FROM __internal_schema.audit_log WHERE sql_digest != '';""" + Long prevSqlDigestCount = Long.valueOf(prevSqlDigestCountResult[0][0]) + logger.info("prev sql_digest count: " + prevSqlDigestCount) + + sql "select * from audit_log_behavior" + + Thread.sleep(10000) + sql """call flush_audit_log()""" + + def nowSqlDigestCountResult = sql """SELECT COUNT(*) FROM __internal_schema.audit_log WHERE sql_digest != '';""" + Long nowSqlDigestCount = Long.valueOf(nowSqlDigestCountResult[0][0]) + logger.info("now sql_digest count: " + nowSqlDigestCount) + + assertTrue(nowSqlDigestCount > prevSqlDigestCount, "Count of sql_digest did not increase") + } + + Thread.sleep(10000) + sql """call flush_audit_log()""" + + setFeConfigTemporary([sql_digest_generation_threshold_ms:100000]) { + sql """call flush_audit_log()""" + Thread.sleep(10000) + + def prevSqlDigestCountResult = sql """SELECT COUNT(*) FROM __internal_schema.audit_log WHERE sql_digest != '';""" + Long prevSqlDigestCount = Long.valueOf(prevSqlDigestCountResult[0][0]) + logger.info("prev sql_digest count: " + prevSqlDigestCount) + + sql "select * from audit_log_behavior" + + Thread.sleep(10000) + sql """call flush_audit_log()""" + + def nowSqlDigestCountResult = sql """SELECT COUNT(*) FROM __internal_schema.audit_log WHERE sql_digest != '';""" + Long nowSqlDigestCount = Long.valueOf(nowSqlDigestCountResult[0][0]) + logger.info("now sql_digest count: " + nowSqlDigestCount) + + assertTrue(nowSqlDigestCount == prevSqlDigestCount, "Count of sql_digest changed") + } + + Thread.sleep(10000) + sql """call flush_audit_log()""" +}