diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java index 56465f9c17bbea..4208d5def2ebac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java @@ -93,9 +93,7 @@ public void exec(AuditEvent event) { break; } } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.debug("failed to process audit event", e); - } + LOG.warn("failed to process audit event: {}", event.queryId, e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java index 5cb826dc86c990..9a1e350d411d2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java @@ -48,7 +48,7 @@ public class AuditEventProcessor { private List auditPlugins; private long lastUpdateTime = 0; - private BlockingQueue eventQueue = Queues.newLinkedBlockingDeque(10000); + private BlockingQueue eventQueue = Queues.newLinkedBlockingDeque(); private Thread workerThread; private volatile boolean isStopped = false; @@ -89,22 +89,23 @@ public void stop() { } public boolean handleAuditEvent(AuditEvent auditEvent) { - return handleAuditEvent(auditEvent, false); - } - - public boolean handleAuditEvent(AuditEvent auditEvent, boolean ignoreQueueFullLog) { if (skipAuditUsers.contains(auditEvent.user)) { // return true to ignore this event return true; } boolean isAddSucc = true; try { - eventQueue.add(auditEvent); + if (eventQueue.size() >= Config.audit_event_log_queue_size) { + isAddSucc = false; + LOG.warn("the audit event queue is full with size {}, discard the audit event: {}", + eventQueue.size(), auditEvent.queryId); + } else { + eventQueue.add(auditEvent); + } } catch (Exception e) { isAddSucc = false; - if (!ignoreQueueFullLog) { - LOG.warn("encounter exception when handle audit event {}, ignore", auditEvent.type, e); - } + LOG.warn("encounter exception when handle audit event {}, discard the event", + auditEvent.queryId, e); } return isAddSucc; } @@ -130,9 +131,7 @@ public void run() { continue; } } catch (InterruptedException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("encounter exception when getting audit event from queue, ignore", e); - } + LOG.warn("encounter exception when getting audit event from queue, ignore", e); continue; } @@ -143,9 +142,7 @@ public void run() { } } } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.debug("encounter exception when processing audit event.", e); - } + LOG.warn("encounter exception when processing audit events. ignore", e); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index 2694660330b5ef..df95d3f3173b9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -41,6 +41,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; +import org.apache.doris.plugin.AuditEvent; import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; import org.apache.doris.plugin.AuditEvent.EventType; import org.apache.doris.qe.QueryState.MysqlStateType; @@ -284,7 +285,11 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme if (ctx.getCommand() == MysqlCommand.COM_STMT_PREPARE && ctx.getState().getErrorCode() == null) { auditEventBuilder.setState(String.valueOf(MysqlStateType.OK)); } - Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(auditEventBuilder.build()); + AuditEvent event = auditEventBuilder.build(); + Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(event); + if (LOG.isDebugEnabled()) { + LOG.debug("submit audit event: {}", event.queryId); + } } private static String getStmtType(StatementBase stmt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 55c4488e7ac5c4..cbc0aee98a5c8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -377,7 +377,6 @@ public void executeQuery(String originStmt) throws Exception { } auditAfterExec(auditStmt, executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(), true); - LOG.debug("Write audit logs for query {}", DebugUtil.printId(ctx.queryId)); // execute failed, skip remaining stmts if (ctx.getState().getStateType() == MysqlStateType.ERR) { break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java index 9ded1f04cbc039..97b5061a212907 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java @@ -183,7 +183,7 @@ private void handleExecute() { } ctx.setStartTime(); - // nererids + // nereids PreparedStatementContext preparedStatementContext = ctx.getPreparedStementContext(String.valueOf(stmtId)); if (preparedStatementContext == null) { LOG.warn("No such statement in context, stmtId:{}", stmtId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index 695bf983dc6b2f..0c2e4f458bc391 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -87,7 +87,7 @@ protected void runAfterCatalogReady() { auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes; auditEvent.shuffleSendRows = queryStats.shuffle_send_rows; } - boolean ret = Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent, true); + boolean ret = Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent); if (!ret) { missedLogCount++; } else { @@ -110,9 +110,10 @@ public void submitFinishQueryToAudit(AuditEvent event) { queryAuditEventLogWriteLock(); try { if (queryAuditEventList.size() >= Config.audit_event_log_queue_size) { - LOG.warn("audit log event queue size {} is full, this may cause audit log missed." - + "you can check whether qps is too high or reset audit_event_log_queue_size", - queryAuditEventList.size()); + LOG.warn("audit log event queue size {} is full, this may cause audit log missing statistics." + + "you can check whether qps is too high or " + + "set audit_event_log_queue_size to a larger value in fe.conf. query id: {}", + queryAuditEventList.size(), event.queryId); return; } event.pushToAuditLogQueueTime = System.currentTimeMillis(); @@ -122,7 +123,7 @@ public void submitFinishQueryToAudit(AuditEvent event) { } } - public List getQueryNeedAudit() { + private List getQueryNeedAudit() { List ret = new ArrayList<>(); long currentTime = System.currentTimeMillis(); queryAuditEventLogWriteLock();