diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java index 3e10b961ba5372..afe1f9af2da60d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java @@ -80,14 +80,18 @@ public class InternalSchema { AUDIT_SCHEMA.add(new ColumnDef("scan_bytes", TypeDef.create(PrimitiveType.BIGINT), true)); AUDIT_SCHEMA.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), true)); AUDIT_SCHEMA.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_rows", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_bytes", TypeDef.create(PrimitiveType.BIGINT), true)); AUDIT_SCHEMA.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), true)); AUDIT_SCHEMA.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), true)); + AUDIT_SCHEMA.add(new ColumnDef("is_nereids", TypeDef.create(PrimitiveType.TINYINT), true)); AUDIT_SCHEMA.add(new ColumnDef("frontend_ip", TypeDef.createVarchar(128), true)); AUDIT_SCHEMA.add(new ColumnDef("cpu_time_ms", TypeDef.create(PrimitiveType.BIGINT), true)); AUDIT_SCHEMA.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128), true)); AUDIT_SCHEMA.add(new ColumnDef("sql_digest", TypeDef.createVarchar(128), true)); AUDIT_SCHEMA.add(new ColumnDef("peak_memory_bytes", TypeDef.create(PrimitiveType.BIGINT), true)); AUDIT_SCHEMA.add(new ColumnDef("workload_group", TypeDef.create(PrimitiveType.STRING), true)); + // Keep stmt as last column. So that in fe.audit.log, it will be easier to get sql string AUDIT_SCHEMA.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), true)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java index 60886acd92faec..1afc9253585a7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.commands; +import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.nereids.analyzer.UnboundFunction; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.call.CallFunc; @@ -58,4 +59,15 @@ public R accept(PlanVisitor visitor, C context) { return visitor.visitCallCommand(this, context); } + @Override + public RedirectStatus toRedirectStatus() { + // Some of call statements may need to be redirected, some may not + String funcName = unboundFunction.getName().toUpperCase(); + switch (funcName) { + case "FLUSH_AUDIT_LOG": + return RedirectStatus.NO_FORWARD; + default: + return RedirectStatus.FORWARD_WITH_SYNC; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFlushAuditLogFunc.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFlushAuditLogFunc.java new file mode 100644 index 00000000000000..60cae55e7f5397 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFlushAuditLogFunc.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.call; + +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Expression; + +import java.util.List; + +/** + * call flush_audit_log() + * This will flush audit log immediately to the audit_log table. + * Mainly for test cases, so that we don't need to wait 60 sec to flush the audit log. + */ +public class CallFlushAuditLogFunc extends CallFunc { + + private UserIdentity user; + + private CallFlushAuditLogFunc(UserIdentity user) { + this.user = user; + } + + public static CallFunc create(UserIdentity user, List args) { + if (!args.isEmpty()) { + throw new AnalysisException("FLUSH_AUDIT_LOG function requires no parameter"); + } + return new CallFlushAuditLogFunc(user); + } + + @Override + public void run() { + // check priv + if (!Env.getCurrentEnv().getAuth().checkGlobalPriv(user, PrivPredicate.ADMIN)) { + throw new AnalysisException("Only admin can flush audit log"); + } + // flush audit log + Env.getCurrentEnv().getPluginMgr().flushAuditLog(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java index 4a8cf560c28e77..4eac9c6f9b85e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java @@ -36,6 +36,8 @@ public static CallFunc getFunc(ConnectContext ctx, UserIdentity user, UnboundFun // TODO, built-in functions require a separate management case "EXECUTE_STMT": // Call built-in functions first return CallExecuteStmtFunc.create(user, unboundFunction.getArguments()); + case "FLUSH_AUDIT_LOG": + return CallFlushAuditLogFunc.create(user, unboundFunction.getArguments()); default: return CallProcedure.create(ctx, originSql); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java index 9e5fc6ec5280f1..5fc735a0bb882a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java @@ -80,9 +80,9 @@ public enum EventType { public String queryId = ""; @AuditField(value = "IsQuery") public boolean isQuery = false; - @AuditField(value = "isNereids") + @AuditField(value = "IsNereids") public boolean isNereids = false; - @AuditField(value = "feIp") + @AuditField(value = "FeIp") public String feIp = ""; @AuditField(value = "Stmt") public String stmt = ""; @@ -94,12 +94,10 @@ public enum EventType { public long shuffleSendRows = -1; @AuditField(value = "SqlHash") public String sqlHash = ""; - @AuditField(value = "peakMemoryBytes") + @AuditField(value = "PeakMemoryBytes") public long peakMemoryBytes = -1; @AuditField(value = "SqlDigest") public String sqlDigest = ""; - @AuditField(value = "TraceId") - public String traceId = ""; @AuditField(value = "WorkloadGroup") public String workloadGroup = ""; // note: newly added fields should be always before fuzzyVariables diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java index ea69b247e66427..904c8352f4e2f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java @@ -61,6 +61,9 @@ public class PluginMgr implements Writable { // all dynamic plugins should have unique names, private final Set dynamicPluginNames; + // Save this handler for external call + private AuditLoader auditLoader = null; + public PluginMgr() { plugins = new Map[PluginType.MAX_PLUGIN_TYPE_SIZE]; for (int i = 0; i < PluginType.MAX_PLUGIN_TYPE_SIZE; i++) { @@ -113,8 +116,8 @@ private void initBuiltinPlugins() { } // AuditLoader: log audit log to internal table - AuditLoader auditLoaderPlugin = new AuditLoader(); - if (!registerBuiltinPlugin(auditLoaderPlugin.getPluginInfo(), auditLoaderPlugin)) { + this.auditLoader = new AuditLoader(); + if (!registerBuiltinPlugin(auditLoader.getPluginInfo(), auditLoader)) { LOG.warn("failed to register audit log builder"); } @@ -359,6 +362,12 @@ public List> getPluginShowInfos() { return rows; } + public void flushAuditLog() { + if (auditLoader != null) { + auditLoader.loadIfNecessary(true); + } + } + public void readFields(DataInputStream dis) throws IOException { int size = dis.readInt(); for (int i = 0; i < size; i++) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java index 24eda23fc5b5cd..55dbba9805e762 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java @@ -35,8 +35,6 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -48,9 +46,6 @@ public class AuditLoader extends Plugin implements AuditPlugin { public static final String AUDIT_LOG_TABLE = "audit_log"; - private static final DateTimeFormatter DATETIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") - .withZone(ZoneId.systemDefault()); - private StringBuilder auditLogBuffer = new StringBuilder(); private int auditLogNum = 0; private long lastLoadTimeAuditLog = 0; @@ -90,7 +85,7 @@ public void init(PluginInfo info, PluginContext ctx) throws PluginException { // GlobalVariable.audit_plugin_max_batch_bytes. this.auditEventQueue = Queues.newLinkedBlockingDeque(100000); this.streamLoader = new AuditStreamLoader(); - this.loadThread = new Thread(new LoadWorker(this.streamLoader), "audit loader thread"); + this.loadThread = new Thread(new LoadWorker(), "audit loader thread"); this.loadThread.start(); isInit = true; @@ -143,6 +138,7 @@ private void assembleAudit(AuditEvent event) { } private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) { + // should be same order as InternalSchema.AUDIT_SCHEMA logBuffer.append(event.queryId).append("\t"); logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("\t"); logBuffer.append(event.clientIp).append("\t"); @@ -156,8 +152,11 @@ private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) { logBuffer.append(event.scanBytes).append("\t"); logBuffer.append(event.scanRows).append("\t"); logBuffer.append(event.returnRows).append("\t"); + logBuffer.append(event.shuffleSendRows).append("\t"); + logBuffer.append(event.shuffleSendBytes).append("\t"); logBuffer.append(event.stmtId).append("\t"); logBuffer.append(event.isQuery ? 1 : 0).append("\t"); + logBuffer.append(event.isNereids ? 1 : 0).append("\t"); logBuffer.append(event.feIp).append("\t"); logBuffer.append(event.cpuTimeMs).append("\t"); logBuffer.append(event.sqlHash).append("\t"); @@ -172,10 +171,12 @@ private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) { logBuffer.append(stmt).append("\n"); } - private void loadIfNecessary(AuditStreamLoader loader) { + // public for external call. + // synchronized to avoid concurrent load. + public synchronized void loadIfNecessary(boolean force) { long currentTime = System.currentTimeMillis(); - if (auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes + if (force || auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes || currentTime - lastLoadTimeAuditLog >= GlobalVariable.auditPluginMaxBatchInternalSec * 1000) { // begin to load try { @@ -188,7 +189,7 @@ private void loadIfNecessary(AuditStreamLoader loader) { discardLogNum += auditLogNum; return; } - AuditStreamLoader.LoadResponse response = loader.loadBatch(auditLogBuffer, token); + AuditStreamLoader.LoadResponse response = streamLoader.loadBatch(auditLogBuffer, token); if (LOG.isDebugEnabled()) { LOG.debug("audit loader response: {}", response); } @@ -214,10 +215,8 @@ private void resetBatch(long currentTime) { } private class LoadWorker implements Runnable { - private AuditStreamLoader loader; - public LoadWorker(AuditStreamLoader loader) { - this.loader = loader; + public LoadWorker() { } public void run() { @@ -227,7 +226,7 @@ public void run() { if (event != null) { assembleAudit(event); // process all audit logs - loadIfNecessary(loader); + loadIfNecessary(false); } } catch (InterruptedException ie) { if (LOG.isDebugEnabled()) { @@ -240,3 +239,4 @@ public void run() { } } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java index 8d9e2c9d96efbc..161a5830b9ce5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java @@ -116,7 +116,7 @@ private void auditQueryLog(AuditEvent event) throws IllegalAccessException { if (af.value().equals("Time(ms)")) { queryTime = (long) f.get(event); } - sb.append("|").append(af.value()).append("=").append(String.valueOf(f.get(event))); + sb.append("|").append(af.value()).append("=").append(f.get(event)); } String auditLog = sb.toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index afb5b856403d12..02a66a5f6e1c55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -174,6 +174,7 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme CatalogIf catalog = ctx.getCurrentCatalog(); AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder(); + // ATTN: MUST reset, otherwise, the same AuditEventBuilder instance will be used in the next query. auditEventBuilder.reset(); auditEventBuilder .setTimestamp(ctx.getStartTime()) diff --git a/regression-test/data/audit/test_audit_log_behavior.out b/regression-test/data/audit/test_audit_log_behavior.out new file mode 100644 index 00000000000000..96ad13a4c7ca14 --- /dev/null +++ b/regression-test/data/audit/test_audit_log_behavior.out @@ -0,0 +1,30 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !audit_log_schema -- +query_id varchar(48) Yes true \N +time datetime(3) Yes true \N +client_ip varchar(128) Yes true \N +user varchar(128) Yes false \N NONE +catalog varchar(128) Yes false \N NONE +db varchar(128) Yes false \N NONE +state varchar(128) Yes false \N NONE +error_code int Yes false \N NONE +error_message text Yes false \N NONE +query_time bigint Yes false \N NONE +scan_bytes bigint Yes false \N NONE +scan_rows bigint Yes false \N NONE +return_rows bigint Yes false \N NONE +shuffle_send_rows bigint Yes false \N NONE +shuffle_send_bytes bigint Yes false \N NONE +stmt_id bigint Yes false \N NONE +stmt_type varchar(48) Yes false \N NONE +is_query tinyint Yes false \N NONE +is_nereids tinyint Yes false \N NONE +frontend_ip varchar(128) Yes false \N NONE +cpu_time_ms bigint Yes false \N NONE +sql_hash varchar(128) Yes false \N NONE +sql_digest varchar(128) Yes false \N NONE +peak_memory_bytes bigint Yes false \N NONE +workload_group text Yes false \N NONE +compute_group text Yes false \N NONE +stmt text Yes false \N NONE + diff --git a/regression-test/suites/audit/test_audit_log_behavior.groovy b/regression-test/suites/audit/test_audit_log_behavior.groovy index a43b456e6d073e..163de0c931ebb5 100644 --- a/regression-test/suites/audit/test_audit_log_behavior.groovy +++ b/regression-test/suites/audit/test_audit_log_behavior.groovy @@ -19,7 +19,7 @@ suite("test_audit_log_behavior") { try { sql "set global enable_audit_plugin = true" sql "set global audit_plugin_max_sql_length = 58" - sql "set global audit_plugin_max_batch_interval_sec = 1" + // sql "set global audit_plugin_max_batch_interval_sec = 1" } catch (Exception e) { log.warn("skip this case, because " + e.getMessage()) assertTrue(e.getMessage().toUpperCase().contains("ADMIN")) @@ -71,6 +71,8 @@ suite("test_audit_log_behavior") { ] ] + qt_audit_log_schema """desc internal.__internal_schema.audit_log""" + for (def on : [true, false]) { sql "set enable_nereids_planner=${on}" sql "truncate table __internal_schema.audit_log" @@ -80,6 +82,10 @@ suite("test_audit_log_behavior") { sql tuple2[0] } + // make sure audit event is created. + // see WorkloadRuntimeStatusMgr.getQueryNeedAudit() + Thread.sleep(6000) + sql """call flush_audit_log()""" // check result for (int i = 0; i < cnt; i++) { def tuple2 = sqls.get(i) @@ -96,6 +102,7 @@ suite("test_audit_log_behavior") { assertEquals(res[0][0].toString(), tuple2[1].toString()) } } + // do not turn off sql "set global enable_audit_plugin = false" sql "set global audit_plugin_max_sql_length = 4096" sql "set global audit_plugin_max_batch_interval_sec = 60"