Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2899,6 +2899,10 @@ public static int metaServiceRpcRetryTimes() {
@ConfField(mutable = true)
public static boolean enable_cloud_running_txn_check = true;

//* audit_event_log_queue_size = qps * query_audit_log_timeout_ms
@ConfField(mutable = true)
public static int audit_event_log_queue_size = 250000;

@ConfField(description = {"存算分离模式下streamload导入使用的转发策略, 可选值为public-private或者空",
"streamload route policy in cloud mode, availale options are public-private and empty string"})
public static String streamload_redirect_policy = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,21 @@ public void stop() {
}
}

public void handleAuditEvent(AuditEvent auditEvent) {
public boolean handleAuditEvent(AuditEvent auditEvent) {
return handleAuditEvent(auditEvent, false);
}

public boolean handleAuditEvent(AuditEvent auditEvent, boolean ignoreQueueFullLog) {
boolean isAddSucc = true;
try {
eventQueue.add(auditEvent);
} catch (Exception e) {
LOG.warn("encounter exception when handle audit event, ignore", e);
isAddSucc = false;
if (!ignoreQueueFullLog) {
LOG.warn("encounter exception when handle audit event {}, ignore", auditEvent.type, e);
}
}
return isAddSucc;
}

public class Worker implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,35 @@ protected void runAfterCatalogReady() {
Map<String, TQueryStatistics> queryStatisticsMap = getQueryStatisticsMap();

// 2 log query audit
List<AuditEvent> auditEventList = getQueryNeedAudit();
for (AuditEvent auditEvent : auditEventList) {
TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId);
if (queryStats != null) {
auditEvent.scanRows = queryStats.scan_rows;
auditEvent.scanBytes = queryStats.scan_bytes;
auditEvent.scanBytesFromLocalStorage = queryStats.scan_bytes_from_local_storage;
auditEvent.scanBytesFromRemoteStorage = queryStats.scan_bytes_from_remote_storage;
auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes;
auditEvent.cpuTimeMs = queryStats.cpu_ms;
auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes;
auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
try {
List<AuditEvent> auditEventList = getQueryNeedAudit();
int missedLogCount = 0;
int succLogCount = 0;
for (AuditEvent auditEvent : auditEventList) {
TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId);
if (queryStats != null) {
auditEvent.scanRows = queryStats.scan_rows;
auditEvent.scanBytes = queryStats.scan_bytes;
auditEvent.scanBytesFromLocalStorage = queryStats.scan_bytes_from_local_storage;
auditEvent.scanBytesFromRemoteStorage = queryStats.scan_bytes_from_remote_storage;
auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes;
auditEvent.cpuTimeMs = queryStats.cpu_ms;
auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes;
auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
}
boolean ret = Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent, true);
if (!ret) {
missedLogCount++;
} else {
succLogCount++;
}
}
if (missedLogCount > 0) {
LOG.warn("discard audit event because of log queue is full, discard num : {}, succ num : {}",
missedLogCount, succLogCount);
}
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
} catch (Throwable t) {
LOG.warn("exception happens when handleAuditEvent, ", t);
}

// 3 clear beToQueryStatsMap when be report timeout
Expand All @@ -94,6 +109,12 @@ protected void runAfterCatalogReady() {
public void submitFinishQueryToAudit(AuditEvent event) {
queryAuditEventLogWriteLock();
try {
if (queryAuditEventList.size() >= Config.audit_event_log_queue_size) {
LOG.warn("audit log event queue size {} is full, this may cause audit log missed."
+ "you can check whether qps is too high or reset audit_event_log_queue_size",
queryAuditEventList.size());
return;
}
event.pushToAuditLogQueueTime = System.currentTimeMillis();
queryAuditEventList.add(event);
} finally {
Expand Down