From dc761625b335eb17797ff6eb9b4e798ca2d3b60e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Tue, 5 Jan 2021 18:33:13 +0800 Subject: [PATCH 1/4] [Enhancement] Fix bug that audit event cannot be processed in time --- .../src/main/assembly/plugin.properties | 2 +- .../doris/plugin/audit/AuditLoaderPlugin.java | 43 ++++++++++--------- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/fe_plugins/auditloader/src/main/assembly/plugin.properties b/fe_plugins/auditloader/src/main/assembly/plugin.properties index 3ab896c5c30bad..0331385651bdc3 100755 --- a/fe_plugins/auditloader/src/main/assembly/plugin.properties +++ b/fe_plugins/auditloader/src/main/assembly/plugin.properties @@ -18,6 +18,6 @@ name=AuditLoader type=AUDIT description=load audit log to olap load, and user can view the statistic of queries -version=0.12.1 +version=0.13.1 java.version=1.8.0 classname=org.apache.doris.plugin.audit.AuditLoaderPlugin diff --git a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java index 5e7ca2df45b580..bcd0523740b67b 100755 --- a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java +++ b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java @@ -54,7 +54,7 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { private StringBuilder auditBuffer = new StringBuilder(); private long lastLoadTime = 0; - private BlockingQueue batchQueue = new LinkedBlockingDeque(1); + private BlockingQueue auditEventQueue = new LinkedBlockingDeque(1); private DorisStreamLoader streamLoader; private Thread loadThread; @@ -63,7 +63,9 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { private volatile boolean isInit = false; // the max stmt length to be loaded in audit table. - private static final int MAX_STMT_LENGTH = 2000; + private static final int MAX_STMT_LENGTH = 5000; + // the max auditEventQueue size to store audit_event + private static final int MAX_AUDIT_EVENT_SIZE = 4096; @Override public void init(PluginInfo info, PluginContext ctx) throws PluginException { @@ -132,8 +134,17 @@ public boolean eventFilter(AuditEvent.EventType type) { } public void exec(AuditEvent event) { - assembleAudit(event); - loadIfNecessary(); + try { + // In order to ensure that the system can run normally, here we directly + // discard the current audit_event. If this problem occurs frequently, + // improvement can be considered. + if (auditEventQueue.size() >= MAX_AUDIT_EVENT_SIZE) { + throw new PluginException("The previous batch is not processed, and the current batch is discarded."); + } + auditEventQueue.add(event); + } catch (Exception e) { + LOG.debug("encounter exception when putting current audit batch, discard current audit event", e); + } } private void assembleAudit(AuditEvent event) { @@ -158,7 +169,7 @@ private void assembleAudit(AuditEvent event) { auditBuffer.append(stmt).append("\n"); } - private void loadIfNecessary() { + private void loadIfNecessary(DorisStreamLoader loader) { if (auditBuffer.length() < conf.maxBatchSize && System.currentTimeMillis() - lastLoadTime < conf.maxBatchIntervalSec * 1000) { return; } @@ -166,15 +177,8 @@ private void loadIfNecessary() { lastLoadTime = System.currentTimeMillis(); // begin to load try { - if (!batchQueue.isEmpty()) { - // TODO(cmy): if queue is not empty, which means the last batch is not processed. - // In order to ensure that the system can run normally, here we directly - // discard the current batch. If this problem occurs frequently, - // improvement can be considered. - throw new PluginException("The previous batch is not processed, and the current batch is discarded."); - } - - batchQueue.put(this.auditBuffer); + DorisStreamLoader.LoadResponse response = loader.loadBatch(auditBuffer); + LOG.debug("audit loader response: {}", response); } catch (Exception e) { LOG.debug("encounter exception when putting current audit batch, discard current batch", e); } finally { @@ -243,16 +247,13 @@ public LoadWorker(DorisStreamLoader loader) { public void run() { while (!isClosed) { try { - StringBuilder batch = batchQueue.poll(5, TimeUnit.SECONDS); - if (batch == null) { - continue; + AuditEvent event = auditEventQueue.poll(5, TimeUnit.SECONDS); + if (event != null) { + assembleAudit(event); + loadIfNecessary(loader); } - - DorisStreamLoader.LoadResponse response = loader.loadBatch(batch); - LOG.debug("audit loader response: {}", response); } catch (InterruptedException e) { LOG.debug("encounter exception when loading current audit batch", e); - continue; } } } From 2e201a63614280e45122999783c2ffb292e8a87e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Wed, 13 Jan 2021 12:05:12 +0800 Subject: [PATCH 2/4] fix --- .../org/apache/doris/plugin/audit/AuditLoaderPlugin.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java index bcd0523740b67b..c67aa6f85f23d1 100755 --- a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java +++ b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java @@ -135,14 +135,11 @@ public boolean eventFilter(AuditEvent.EventType type) { public void exec(AuditEvent event) { try { + auditEventQueue.add(event); + } catch (Exception e) { // In order to ensure that the system can run normally, here we directly // discard the current audit_event. If this problem occurs frequently, // improvement can be considered. - if (auditEventQueue.size() >= MAX_AUDIT_EVENT_SIZE) { - throw new PluginException("The previous batch is not processed, and the current batch is discarded."); - } - auditEventQueue.add(event); - } catch (Exception e) { LOG.debug("encounter exception when putting current audit batch, discard current audit event", e); } } From 00a3176e8148b5a94b5ee0494bb7925520bde477 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Wed, 13 Jan 2021 16:11:03 +0800 Subject: [PATCH 3/4] fix doc --- docs/en/extending-doris/audit-plugin.md | 51 ++++++++++--------- docs/zh-CN/extending-doris/audit-plugin.md | 5 +- .../doris/plugin/audit/AuditLoaderPlugin.java | 2 +- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/docs/en/extending-doris/audit-plugin.md b/docs/en/extending-doris/audit-plugin.md index 651159d2cee05a..11dd08e82bb019 100644 --- a/docs/en/extending-doris/audit-plugin.md +++ b/docs/en/extending-doris/audit-plugin.md @@ -57,31 +57,32 @@ After deployment is complete, and before installing the plugin, you need to crea ``` create table doris_audit_tbl__ ( -    query_id varchar (48) comment "Unique query id", -    time datetime not null comment "Query start time", -    client_ip varchar (32) comment "Client IP", -    user varchar (64) comment "User name", -    db varchar (96) comment "Database of this query", -    state varchar (8) comment "Query result state. EOF, ERR, OK", -    query_time bigint comment "Query execution time in millisecond", -    scan_bytes bigint comment "Total scan bytes of this query", -    scan_rows bigint comment "Total scan rows of this query", -    return_rows bigint comment "Returned rows of this query", -    stmt_id int comment "An incremental id of statement", -    is_query tinyint comment "Is this statemt a query. 1 or 0", -    frontend_ip varchar (32) comment "Frontend ip of executing this statement", -    stmt varchar (2048) comment "The original statement, trimed if longer than 2048 bytes" -) -partition by range (time) () -distributed by hash (query_id) buckets 1 -properties ( -    "dynamic_partition.time_unit" = "DAY", -    "dynamic_partition.start" = "-30", -    "dynamic_partition.end" = "3", -    "dynamic_partition.prefix" = "p", -    "dynamic_partition.buckets" = "1", -    "dynamic_partition.enable" = "true", -    "replication_num" = "1" + query_id varchar(48) comment "Unique query id", + time datetime not null comment "Query start time", + client_ip varchar(32) comment "Client IP", + user varchar(64) comment "User name", + db varchar(96) comment "Database of this query", + state varchar(8) comment "Query result state. EOF, ERR, OK", + query_time bigint comment "Query execution time in millisecond", + scan_bytes bigint comment "Total scan bytes of this query", + scan_rows bigint comment "Total scan rows of this query", + return_rows bigint comment "Returned rows of this query", + stmt_id int comment "An incremental id of statement", + is_query tinyint comment "Is this statemt a query. 1 or 0", + frontend_ip varchar(32) comment "Frontend ip of executing this statement", + stmt varchar(5000) comment "The original statement, trimed if longer than 5000 bytes" +) engine=OLAP +duplicate key(query_id, time, client_ip) +partition by range(time) () +distributed by hash(query_id) buckets 1 +properties( + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.start" = "-30", + "dynamic_partition.end" = "3", + "dynamic_partition.prefix" = "p", + "dynamic_partition.buckets" = "1", + "dynamic_partition.enable" = "true", + "replication_num" = "1" ); ``` diff --git a/docs/zh-CN/extending-doris/audit-plugin.md b/docs/zh-CN/extending-doris/audit-plugin.md index 71682516245520..1a6b2de5b0ea4f 100644 --- a/docs/zh-CN/extending-doris/audit-plugin.md +++ b/docs/zh-CN/extending-doris/audit-plugin.md @@ -70,8 +70,9 @@ create table doris_audit_tbl__ stmt_id int comment "An incremental id of statement", is_query tinyint comment "Is this statemt a query. 1 or 0", frontend_ip varchar(32) comment "Frontend ip of executing this statement", - stmt varchar(2048) comment "The original statement, trimed if longer than 2048 bytes" -) + stmt varchar(5000) comment "The original statement, trimed if longer than 5000 bytes" +) engine=OLAP +duplicate key(query_id, time, client_ip) partition by range(time) () distributed by hash(query_id) buckets 1 properties( diff --git a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java index c67aa6f85f23d1..a7857689bbd041 100755 --- a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java +++ b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java @@ -63,7 +63,7 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { private volatile boolean isInit = false; // the max stmt length to be loaded in audit table. - private static final int MAX_STMT_LENGTH = 5000; + private static final int MAX_STMT_LENGTH = 4096; // the max auditEventQueue size to store audit_event private static final int MAX_AUDIT_EVENT_SIZE = 4096; From 1a36d02d458795a5fe4fa98853b39e4a131371ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Wed, 13 Jan 2021 16:14:06 +0800 Subject: [PATCH 4/4] fix doc for replication_num config --- docs/en/extending-doris/audit-plugin.md | 2 +- docs/zh-CN/extending-doris/audit-plugin.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/extending-doris/audit-plugin.md b/docs/en/extending-doris/audit-plugin.md index 11dd08e82bb019..4825390fa424e4 100644 --- a/docs/en/extending-doris/audit-plugin.md +++ b/docs/en/extending-doris/audit-plugin.md @@ -82,7 +82,7 @@ properties( "dynamic_partition.prefix" = "p", "dynamic_partition.buckets" = "1", "dynamic_partition.enable" = "true", - "replication_num" = "1" + "replication_num" = "3" ); ``` diff --git a/docs/zh-CN/extending-doris/audit-plugin.md b/docs/zh-CN/extending-doris/audit-plugin.md index 1a6b2de5b0ea4f..e5f9e4c4cf89b1 100644 --- a/docs/zh-CN/extending-doris/audit-plugin.md +++ b/docs/zh-CN/extending-doris/audit-plugin.md @@ -82,7 +82,7 @@ properties( "dynamic_partition.prefix" = "p", "dynamic_partition.buckets" = "1", "dynamic_partition.enable" = "true", - "replication_num" = "1" + "replication_num" = "3" ); ```