Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ public void exec(AuditEvent event) {
break;
}
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("failed to process audit event", e);
}
LOG.warn("failed to process audit event: {}", event.queryId, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class AuditEventProcessor {
private List<Plugin> auditPlugins;
private long lastUpdateTime = 0;

private BlockingQueue<AuditEvent> eventQueue = Queues.newLinkedBlockingDeque(10000);
private BlockingQueue<AuditEvent> eventQueue = Queues.newLinkedBlockingDeque();
private Thread workerThread;

private volatile boolean isStopped = false;
Expand Down Expand Up @@ -89,22 +89,23 @@ public void stop() {
}

public boolean handleAuditEvent(AuditEvent auditEvent) {
return handleAuditEvent(auditEvent, false);
}

public boolean handleAuditEvent(AuditEvent auditEvent, boolean ignoreQueueFullLog) {
if (skipAuditUsers.contains(auditEvent.user)) {
// return true to ignore this event
return true;
}
boolean isAddSucc = true;
try {
eventQueue.add(auditEvent);
if (eventQueue.size() >= Config.audit_event_log_queue_size) {
isAddSucc = false;
LOG.warn("the audit event queue is full with size {}, discard the audit event: {}",
eventQueue.size(), auditEvent.queryId);
} else {
eventQueue.add(auditEvent);
}
} catch (Exception e) {
isAddSucc = false;
if (!ignoreQueueFullLog) {
LOG.warn("encounter exception when handle audit event {}, ignore", auditEvent.type, e);
}
LOG.warn("encounter exception when handle audit event {}, discard the event",
auditEvent.queryId, e);
}
return isAddSucc;
}
Expand All @@ -130,9 +131,7 @@ public void run() {
continue;
}
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("encounter exception when getting audit event from queue, ignore", e);
}
LOG.warn("encounter exception when getting audit event from queue, ignore", e);
continue;
}

Expand All @@ -143,9 +142,7 @@ public void run() {
}
}
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("encounter exception when processing audit event.", e);
}
LOG.warn("encounter exception when processing audit events. ignore", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.qe.QueryState.MysqlStateType;
Expand Down Expand Up @@ -284,7 +285,11 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme
if (ctx.getCommand() == MysqlCommand.COM_STMT_PREPARE && ctx.getState().getErrorCode() == null) {
auditEventBuilder.setState(String.valueOf(MysqlStateType.OK));
}
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(auditEventBuilder.build());
AuditEvent event = auditEventBuilder.build();
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(event);
if (LOG.isDebugEnabled()) {
LOG.debug("submit audit event: {}", event.queryId);
}
}

private static String getStmtType(StatementBase stmt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,6 @@ public void executeQuery(String originStmt) throws Exception {
}
auditAfterExec(auditStmt, executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(),
true);
LOG.debug("Write audit logs for query {}", DebugUtil.printId(ctx.queryId));
// execute failed, skip remaining stmts
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private void handleExecute() {
}

ctx.setStartTime();
// nererids
// nereids
PreparedStatementContext preparedStatementContext = ctx.getPreparedStementContext(String.valueOf(stmtId));
if (preparedStatementContext == null) {
LOG.warn("No such statement in context, stmtId:{}", stmtId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected void runAfterCatalogReady() {
auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes;
auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
}
boolean ret = Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent, true);
boolean ret = Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
if (!ret) {
missedLogCount++;
} else {
Expand All @@ -110,9 +110,10 @@ public void submitFinishQueryToAudit(AuditEvent event) {
queryAuditEventLogWriteLock();
try {
if (queryAuditEventList.size() >= Config.audit_event_log_queue_size) {
LOG.warn("audit log event queue size {} is full, this may cause audit log missed."
+ "you can check whether qps is too high or reset audit_event_log_queue_size",
queryAuditEventList.size());
LOG.warn("audit log event queue size {} is full, this may cause audit log missing statistics."
+ "you can check whether qps is too high or "
+ "set audit_event_log_queue_size to a larger value in fe.conf. query id: {}",
queryAuditEventList.size(), event.queryId);
return;
}
event.pushToAuditLogQueueTime = System.currentTimeMillis();
Expand All @@ -122,7 +123,7 @@ public void submitFinishQueryToAudit(AuditEvent event) {
}
}

public List<AuditEvent> getQueryNeedAudit() {
private List<AuditEvent> getQueryNeedAudit() {
List<AuditEvent> ret = new ArrayList<>();
long currentTime = System.currentTimeMillis();
queryAuditEventLogWriteLock();
Expand Down
Loading