diff --git a/docs/en/extending-doris/audit-plugin.md b/docs/en/extending-doris/audit-plugin.md index 651159d2cee05a..4825390fa424e4 100644 --- a/docs/en/extending-doris/audit-plugin.md +++ b/docs/en/extending-doris/audit-plugin.md @@ -57,31 +57,32 @@ After deployment is complete, and before installing the plugin, you need to crea ``` create table doris_audit_tbl__ ( -    query_id varchar (48) comment "Unique query id", -    time datetime not null comment "Query start time", -    client_ip varchar (32) comment "Client IP", -    user varchar (64) comment "User name", -    db varchar (96) comment "Database of this query", -    state varchar (8) comment "Query result state. EOF, ERR, OK", -    query_time bigint comment "Query execution time in millisecond", -    scan_bytes bigint comment "Total scan bytes of this query", -    scan_rows bigint comment "Total scan rows of this query", -    return_rows bigint comment "Returned rows of this query", -    stmt_id int comment "An incremental id of statement", -    is_query tinyint comment "Is this statemt a query. 1 or 0", -    frontend_ip varchar (32) comment "Frontend ip of executing this statement", -    stmt varchar (2048) comment "The original statement, trimed if longer than 2048 bytes" -) -partition by range (time) () -distributed by hash (query_id) buckets 1 -properties ( -    "dynamic_partition.time_unit" = "DAY", -    "dynamic_partition.start" = "-30", -    "dynamic_partition.end" = "3", -    "dynamic_partition.prefix" = "p", -    "dynamic_partition.buckets" = "1", -    "dynamic_partition.enable" = "true", -    "replication_num" = "1" + query_id varchar(48) comment "Unique query id", + time datetime not null comment "Query start time", + client_ip varchar(32) comment "Client IP", + user varchar(64) comment "User name", + db varchar(96) comment "Database of this query", + state varchar(8) comment "Query result state. EOF, ERR, OK", + query_time bigint comment "Query execution time in millisecond", + scan_bytes bigint comment "Total scan bytes of this query", + scan_rows bigint comment "Total scan rows of this query", + return_rows bigint comment "Returned rows of this query", + stmt_id int comment "An incremental id of statement", + is_query tinyint comment "Is this statemt a query. 1 or 0", + frontend_ip varchar(32) comment "Frontend ip of executing this statement", + stmt varchar(5000) comment "The original statement, trimed if longer than 5000 bytes" +) engine=OLAP +duplicate key(query_id, time, client_ip) +partition by range(time) () +distributed by hash(query_id) buckets 1 +properties( + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.start" = "-30", + "dynamic_partition.end" = "3", + "dynamic_partition.prefix" = "p", + "dynamic_partition.buckets" = "1", + "dynamic_partition.enable" = "true", + "replication_num" = "3" ); ``` diff --git a/docs/zh-CN/extending-doris/audit-plugin.md b/docs/zh-CN/extending-doris/audit-plugin.md index 71682516245520..e5f9e4c4cf89b1 100644 --- a/docs/zh-CN/extending-doris/audit-plugin.md +++ b/docs/zh-CN/extending-doris/audit-plugin.md @@ -70,8 +70,9 @@ create table doris_audit_tbl__ stmt_id int comment "An incremental id of statement", is_query tinyint comment "Is this statemt a query. 1 or 0", frontend_ip varchar(32) comment "Frontend ip of executing this statement", - stmt varchar(2048) comment "The original statement, trimed if longer than 2048 bytes" -) + stmt varchar(5000) comment "The original statement, trimed if longer than 5000 bytes" +) engine=OLAP +duplicate key(query_id, time, client_ip) partition by range(time) () distributed by hash(query_id) buckets 1 properties( @@ -81,7 +82,7 @@ properties( "dynamic_partition.prefix" = "p", "dynamic_partition.buckets" = "1", "dynamic_partition.enable" = "true", - "replication_num" = "1" + "replication_num" = "3" ); ``` diff --git a/fe_plugins/auditloader/src/main/assembly/plugin.properties b/fe_plugins/auditloader/src/main/assembly/plugin.properties index 3ab896c5c30bad..0331385651bdc3 100755 --- a/fe_plugins/auditloader/src/main/assembly/plugin.properties +++ b/fe_plugins/auditloader/src/main/assembly/plugin.properties @@ -18,6 +18,6 @@ name=AuditLoader type=AUDIT description=load audit log to olap load, and user can view the statistic of queries -version=0.12.1 +version=0.13.1 java.version=1.8.0 classname=org.apache.doris.plugin.audit.AuditLoaderPlugin diff --git a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java index 5e7ca2df45b580..a7857689bbd041 100755 --- a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java +++ b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java @@ -54,7 +54,7 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { private StringBuilder auditBuffer = new StringBuilder(); private long lastLoadTime = 0; - private BlockingQueue batchQueue = new LinkedBlockingDeque(1); + private BlockingQueue auditEventQueue = new LinkedBlockingDeque(1); private DorisStreamLoader streamLoader; private Thread loadThread; @@ -63,7 +63,9 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { private volatile boolean isInit = false; // the max stmt length to be loaded in audit table. - private static final int MAX_STMT_LENGTH = 2000; + private static final int MAX_STMT_LENGTH = 4096; + // the max auditEventQueue size to store audit_event + private static final int MAX_AUDIT_EVENT_SIZE = 4096; @Override public void init(PluginInfo info, PluginContext ctx) throws PluginException { @@ -132,8 +134,14 @@ public boolean eventFilter(AuditEvent.EventType type) { } public void exec(AuditEvent event) { - assembleAudit(event); - loadIfNecessary(); + try { + auditEventQueue.add(event); + } catch (Exception e) { + // In order to ensure that the system can run normally, here we directly + // discard the current audit_event. If this problem occurs frequently, + // improvement can be considered. + LOG.debug("encounter exception when putting current audit batch, discard current audit event", e); + } } private void assembleAudit(AuditEvent event) { @@ -158,7 +166,7 @@ private void assembleAudit(AuditEvent event) { auditBuffer.append(stmt).append("\n"); } - private void loadIfNecessary() { + private void loadIfNecessary(DorisStreamLoader loader) { if (auditBuffer.length() < conf.maxBatchSize && System.currentTimeMillis() - lastLoadTime < conf.maxBatchIntervalSec * 1000) { return; } @@ -166,15 +174,8 @@ private void loadIfNecessary() { lastLoadTime = System.currentTimeMillis(); // begin to load try { - if (!batchQueue.isEmpty()) { - // TODO(cmy): if queue is not empty, which means the last batch is not processed. - // In order to ensure that the system can run normally, here we directly - // discard the current batch. If this problem occurs frequently, - // improvement can be considered. - throw new PluginException("The previous batch is not processed, and the current batch is discarded."); - } - - batchQueue.put(this.auditBuffer); + DorisStreamLoader.LoadResponse response = loader.loadBatch(auditBuffer); + LOG.debug("audit loader response: {}", response); } catch (Exception e) { LOG.debug("encounter exception when putting current audit batch, discard current batch", e); } finally { @@ -243,16 +244,13 @@ public LoadWorker(DorisStreamLoader loader) { public void run() { while (!isClosed) { try { - StringBuilder batch = batchQueue.poll(5, TimeUnit.SECONDS); - if (batch == null) { - continue; + AuditEvent event = auditEventQueue.poll(5, TimeUnit.SECONDS); + if (event != null) { + assembleAudit(event); + loadIfNecessary(loader); } - - DorisStreamLoader.LoadResponse response = loader.loadBatch(batch); - LOG.debug("audit loader response: {}", response); } catch (InterruptedException e) { LOG.debug("encounter exception when loading current audit batch", e); - continue; } } }