From dabb4acc987a4fe56bd3dc9be4026aec9f44f8fd Mon Sep 17 00:00:00 2001 From: wangbo <506340561@qq.com> Date: Mon, 15 Jul 2024 11:51:54 +0800 Subject: [PATCH] Add audit log event queue size limit --- .../java/org/apache/doris/common/Config.java | 4 ++ .../apache/doris/qe/AuditEventProcessor.java | 13 ++++- .../WorkloadRuntimeStatusMgr.java | 47 ++++++++++++++----- 3 files changed, 49 insertions(+), 15 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 5756d8fbfc2ee5..30bcfa7b292e90 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2899,6 +2899,10 @@ public static int metaServiceRpcRetryTimes() { @ConfField(mutable = true) public static boolean enable_cloud_running_txn_check = true; + //* audit_event_log_queue_size = qps * query_audit_log_timeout_ms + @ConfField(mutable = true) + public static int audit_event_log_queue_size = 250000; + @ConfField(description = {"存算分离模式下streamload导入使用的转发策略, 可选值为public-private或者空", "streamload route policy in cloud mode, availale options are public-private and empty string"}) public static String streamload_redirect_policy = ""; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java index ed0d028315610d..e2c45ae39ae23b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java @@ -70,12 +70,21 @@ public void stop() { } } - public void handleAuditEvent(AuditEvent auditEvent) { + public boolean handleAuditEvent(AuditEvent auditEvent) { + return handleAuditEvent(auditEvent, false); + } + + public boolean handleAuditEvent(AuditEvent auditEvent, boolean ignoreQueueFullLog) { + boolean isAddSucc = true; try { eventQueue.add(auditEvent); } catch (Exception e) { - LOG.warn("encounter exception when handle audit event, ignore", e); + isAddSucc = false; + if (!ignoreQueueFullLog) { + LOG.warn("encounter exception when handle audit event {}, ignore", auditEvent.type, e); + } } + return isAddSucc; } public class Worker implements Runnable { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index 07955ede778869..b2de010b9e418f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -71,20 +71,35 @@ protected void runAfterCatalogReady() { Map queryStatisticsMap = getQueryStatisticsMap(); // 2 log query audit - List auditEventList = getQueryNeedAudit(); - for (AuditEvent auditEvent : auditEventList) { - TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId); - if (queryStats != null) { - auditEvent.scanRows = queryStats.scan_rows; - auditEvent.scanBytes = queryStats.scan_bytes; - auditEvent.scanBytesFromLocalStorage = queryStats.scan_bytes_from_local_storage; - auditEvent.scanBytesFromRemoteStorage = queryStats.scan_bytes_from_remote_storage; - auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes; - auditEvent.cpuTimeMs = queryStats.cpu_ms; - auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes; - auditEvent.shuffleSendRows = queryStats.shuffle_send_rows; + try { + List auditEventList = getQueryNeedAudit(); + int missedLogCount = 0; + int succLogCount = 0; + for (AuditEvent auditEvent : auditEventList) { + TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId); + if (queryStats != null) { + auditEvent.scanRows = queryStats.scan_rows; + auditEvent.scanBytes = queryStats.scan_bytes; + auditEvent.scanBytesFromLocalStorage = queryStats.scan_bytes_from_local_storage; + auditEvent.scanBytesFromRemoteStorage = queryStats.scan_bytes_from_remote_storage; + auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes; + auditEvent.cpuTimeMs = queryStats.cpu_ms; + auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes; + auditEvent.shuffleSendRows = queryStats.shuffle_send_rows; + } + boolean ret = Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent, true); + if (!ret) { + missedLogCount++; + } else { + succLogCount++; + } + } + if (missedLogCount > 0) { + LOG.warn("discard audit event because of log queue is full, discard num : {}, succ num : {}", + missedLogCount, succLogCount); } - Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent); + } catch (Throwable t) { + LOG.warn("exception happens when handleAuditEvent, ", t); } // 3 clear beToQueryStatsMap when be report timeout @@ -94,6 +109,12 @@ protected void runAfterCatalogReady() { public void submitFinishQueryToAudit(AuditEvent event) { queryAuditEventLogWriteLock(); try { + if (queryAuditEventList.size() >= Config.audit_event_log_queue_size) { + LOG.warn("audit log event queue size {} is full, this may cause audit log missed." + + "you can check whether qps is too high or reset audit_event_log_queue_size", + queryAuditEventList.size()); + return; + } event.pushToAuditLogQueueTime = System.currentTimeMillis(); queryAuditEventList.add(event); } finally {