Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 26 additions & 25 deletions docs/en/extending-doris/audit-plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,32 @@ After deployment is complete, and before installing the plugin, you need to crea
```
create table doris_audit_tbl__
(
    query_id varchar (48) comment "Unique query id",
    time datetime not null comment "Query start time",
    client_ip varchar (32) comment "Client IP",
    user varchar (64) comment "User name",
    db varchar (96) comment "Database of this query",
    state varchar (8) comment "Query result state. EOF, ERR, OK",
    query_time bigint comment "Query execution time in millisecond",
    scan_bytes bigint comment "Total scan bytes of this query",
    scan_rows bigint comment "Total scan rows of this query",
    return_rows bigint comment "Returned rows of this query",
    stmt_id int comment "An incremental id of statement",
    is_query tinyint comment "Is this statemt a query. 1 or 0",
    frontend_ip varchar (32) comment "Frontend ip of executing this statement",
    stmt varchar (2048) comment "The original statement, trimed if longer than 2048 bytes"
)
partition by range (time) ()
distributed by hash (query_id) buckets 1
properties (
    "dynamic_partition.time_unit" = "DAY",
    "dynamic_partition.start" = "-30",
    "dynamic_partition.end" = "3",
    "dynamic_partition.prefix" = "p",
    "dynamic_partition.buckets" = "1",
    "dynamic_partition.enable" = "true",
    "replication_num" = "1"
query_id varchar(48) comment "Unique query id",
time datetime not null comment "Query start time",
client_ip varchar(32) comment "Client IP",
user varchar(64) comment "User name",
db varchar(96) comment "Database of this query",
state varchar(8) comment "Query result state. EOF, ERR, OK",
query_time bigint comment "Query execution time in millisecond",
scan_bytes bigint comment "Total scan bytes of this query",
scan_rows bigint comment "Total scan rows of this query",
return_rows bigint comment "Returned rows of this query",
stmt_id int comment "An incremental id of statement",
is_query tinyint comment "Is this statemt a query. 1 or 0",
frontend_ip varchar(32) comment "Frontend ip of executing this statement",
stmt varchar(5000) comment "The original statement, trimed if longer than 5000 bytes"
) engine=OLAP
duplicate key(query_id, time, client_ip)
partition by range(time) ()
distributed by hash(query_id) buckets 1
properties(
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "1",
"dynamic_partition.enable" = "true",
"replication_num" = "3"
);
```

Expand Down
7 changes: 4 additions & 3 deletions docs/zh-CN/extending-doris/audit-plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ create table doris_audit_tbl__
stmt_id int comment "An incremental id of statement",
is_query tinyint comment "Is this statemt a query. 1 or 0",
frontend_ip varchar(32) comment "Frontend ip of executing this statement",
stmt varchar(2048) comment "The original statement, trimed if longer than 2048 bytes"
)
stmt varchar(5000) comment "The original statement, trimed if longer than 5000 bytes"
) engine=OLAP
duplicate key(query_id, time, client_ip)
partition by range(time) ()
distributed by hash(query_id) buckets 1
properties(
Expand All @@ -81,7 +82,7 @@ properties(
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "1",
"dynamic_partition.enable" = "true",
"replication_num" = "1"
"replication_num" = "3"
);
```

Expand Down
2 changes: 1 addition & 1 deletion fe_plugins/auditloader/src/main/assembly/plugin.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
name=AuditLoader
type=AUDIT
description=load audit log to olap load, and user can view the statistic of queries
version=0.12.1
version=0.13.1
java.version=1.8.0
classname=org.apache.doris.plugin.audit.AuditLoaderPlugin
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
private StringBuilder auditBuffer = new StringBuilder();
private long lastLoadTime = 0;

private BlockingQueue<StringBuilder> batchQueue = new LinkedBlockingDeque<StringBuilder>(1);
private BlockingQueue<AuditEvent> auditEventQueue = new LinkedBlockingDeque<AuditEvent>(1);
private DorisStreamLoader streamLoader;
private Thread loadThread;

Expand All @@ -63,7 +63,9 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
private volatile boolean isInit = false;

// the max stmt length to be loaded in audit table.
private static final int MAX_STMT_LENGTH = 2000;
private static final int MAX_STMT_LENGTH = 4096;
// the max auditEventQueue size to store audit_event
private static final int MAX_AUDIT_EVENT_SIZE = 4096;

@Override
public void init(PluginInfo info, PluginContext ctx) throws PluginException {
Expand Down Expand Up @@ -132,8 +134,14 @@ public boolean eventFilter(AuditEvent.EventType type) {
}

public void exec(AuditEvent event) {
assembleAudit(event);
loadIfNecessary();
try {
auditEventQueue.add(event);
} catch (Exception e) {
// In order to ensure that the system can run normally, here we directly
// discard the current audit_event. If this problem occurs frequently,
// improvement can be considered.
LOG.debug("encounter exception when putting current audit batch, discard current audit event", e);
}
}

private void assembleAudit(AuditEvent event) {
Expand All @@ -158,23 +166,16 @@ private void assembleAudit(AuditEvent event) {
auditBuffer.append(stmt).append("\n");
}

private void loadIfNecessary() {
private void loadIfNecessary(DorisStreamLoader loader) {
if (auditBuffer.length() < conf.maxBatchSize && System.currentTimeMillis() - lastLoadTime < conf.maxBatchIntervalSec * 1000) {
return;
}

lastLoadTime = System.currentTimeMillis();
// begin to load
try {
if (!batchQueue.isEmpty()) {
// TODO(cmy): if queue is not empty, which means the last batch is not processed.
// In order to ensure that the system can run normally, here we directly
// discard the current batch. If this problem occurs frequently,
// improvement can be considered.
throw new PluginException("The previous batch is not processed, and the current batch is discarded.");
}

batchQueue.put(this.auditBuffer);
DorisStreamLoader.LoadResponse response = loader.loadBatch(auditBuffer);
LOG.debug("audit loader response: {}", response);
} catch (Exception e) {
LOG.debug("encounter exception when putting current audit batch, discard current batch", e);
} finally {
Expand Down Expand Up @@ -243,16 +244,13 @@ public LoadWorker(DorisStreamLoader loader) {
public void run() {
while (!isClosed) {
try {
StringBuilder batch = batchQueue.poll(5, TimeUnit.SECONDS);
if (batch == null) {
continue;
AuditEvent event = auditEventQueue.poll(5, TimeUnit.SECONDS);
if (event != null) {
assembleAudit(event);
loadIfNecessary(loader);
}

DorisStreamLoader.LoadResponse response = loader.loadBatch(batch);
LOG.debug("audit loader response: {}", response);
} catch (InterruptedException e) {
LOG.debug("encounter exception when loading current audit batch", e);
continue;
}
}
}
Expand Down