From 0f78cfac6d67974ae9a8c4e16aa7ca682d1c92d0 Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 29 Dec 2020 18:19:03 +0800 Subject: [PATCH 1/3] For #5169 Add publish timout param when exec insert --- .../main/java/org/apache/doris/common/Config.java | 5 +++++ .../java/org/apache/doris/qe/ConnectProcessor.java | 4 ++++ .../java/org/apache/doris/qe/MasterOpExecutor.java | 3 +++ .../java/org/apache/doris/qe/SessionVariable.java | 13 +++++++++++++ .../main/java/org/apache/doris/qe/StmtExecutor.java | 6 +++++- gensrc/thrift/FrontendService.thrift | 1 + 6 files changed, 31 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index c3ba37afad0c05..4409089e5f7b85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1311,4 +1311,9 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static boolean enable_fe_heartbeat_by_thrift = false; + + /** + * default value for insert_visible_timeout_ms + */ + public static long default_insert_visible_timeout_ms = 10000; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index e32cbee1e5f545..679e1841df7c96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -415,6 +415,10 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) { ctx.setCurrentUserIdentity(currentUserIdentity); } + if (request.isSetInsertVisibleTimeoutMs()) { + ctx.getSessionVariable().setInsertVisibleTimeoutMs(request.getInsertVisibleTimeoutMs()); + } + if (request.isSetQueryOptions()) { TQueryOptions queryOptions = request.getQueryOptions(); if (queryOptions.isSetMemLimit()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index e222a2e4133973..a89958a29ec9a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -93,6 +93,9 @@ private void forward() throws Exception { params.setStmtId(ctx.getStmtId()); params.setEnableStrictMode(ctx.getSessionVariable().getEnableInsertStrict()); params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + if (ctx.getSessionVariable().getInsertVisibleTimeoutMs() > 0) { + params.setInsertVisibleTimeoutMs(ctx.getSessionVariable().getInsertVisibleTimeoutMs()); + } TQueryOptions queryOptions = new TQueryOptions(); queryOptions.setMemLimit(ctx.getSessionVariable().getMaxExecMemByte()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 1a96c137609cc7..fb33aef0b51e47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -112,6 +112,12 @@ public class SessionVariable implements Serializable, Writable { // when true, the partition column must be set to NOT NULL. public static final String ALLOW_PARTITION_COLUMN_NULLABLE = "allow_partition_column_nullable"; + // max ms to wait transaction publish finish when exec insert stmt. + public static final String INSERT_VISIBLE_TIMEOUT_MS = "insert_visible_timeout_ms"; + + @VariableMgr.VarAttr(name = INSERT_VISIBLE_TIMEOUT_MS) + private long insertVisibleTimeoutMs = -1; + // max memory used on every backend. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) public long maxExecMemByte = 2147483648L; @@ -544,6 +550,13 @@ public void setShowHiddenColumns(boolean showHiddenColumns) { public boolean isAllowPartitionColumnNullable() { return allowPartitionColumnNullable; } + public long getInsertVisibleTimeoutMs() { + return insertVisibleTimeoutMs; + } + + public void setInsertVisibleTimeoutMs(long insertVisibleTimeoutMs) { + this.insertVisibleTimeoutMs = insertVisibleTimeoutMs; + } // Serialize to thrift object // used for rest api diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 6e33853eddd55b..3dd271f1693c4f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -898,10 +898,14 @@ private void handleInsertStmt() throws Exception { return; } + long visibleTimeoutMs = context.getSessionVariable().getInsertVisibleTimeoutMs(); + if (visibleTimeoutMs < 0) { + visibleTimeoutMs = Config.default_insert_visible_timeout_ms; + } if (Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( insertStmt.getDbObj(), insertStmt.getTransactionId(), TabletCommitInfo.fromThrift(coord.getCommitInfos()), - 10000)) { + visibleTimeoutMs)) { txnStatus = TransactionStatus.VISIBLE; MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); } else { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index b10fbe7d16919b..df3ddbe90d23a4 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -443,6 +443,7 @@ struct TMasterOpRequest { 15: optional i32 stmtIdx // the idx of the sql in multi statements 16: optional PaloInternalService.TQueryOptions query_options 17: optional Types.TUniqueId query_id // when this is a query, we translate this query id to master + 18: optional i64 insert_visible_timeout_ms } struct TColumnDefinition { From b5f047f946f5621e6c0a692fdd966f1f05e282b4 Mon Sep 17 00:00:00 2001 From: liulijia Date: Thu, 7 Jan 2021 18:47:20 +0800 Subject: [PATCH 2/3] Remove default insert visible timeout in Config & add min insert visible timeout --- .../java/org/apache/doris/common/Config.java | 5 ----- .../org/apache/doris/qe/MasterOpExecutor.java | 4 +--- .../org/apache/doris/qe/SessionVariable.java | 16 +++++++++++++--- .../java/org/apache/doris/qe/StmtExecutor.java | 7 +------ 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 4409089e5f7b85..c3ba37afad0c05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1311,9 +1311,4 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static boolean enable_fe_heartbeat_by_thrift = false; - - /** - * default value for insert_visible_timeout_ms - */ - public static long default_insert_visible_timeout_ms = 10000; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index a89958a29ec9a3..fa6272820a6dce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -93,9 +93,7 @@ private void forward() throws Exception { params.setStmtId(ctx.getStmtId()); params.setEnableStrictMode(ctx.getSessionVariable().getEnableInsertStrict()); params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); - if (ctx.getSessionVariable().getInsertVisibleTimeoutMs() > 0) { - params.setInsertVisibleTimeoutMs(ctx.getSessionVariable().getInsertVisibleTimeoutMs()); - } + params.setInsertVisibleTimeoutMs(ctx.getSessionVariable().getInsertVisibleTimeoutMs()); TQueryOptions queryOptions = new TQueryOptions(); queryOptions.setMemLimit(ctx.getSessionVariable().getMaxExecMemByte()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index fb33aef0b51e47..885c543ed7f8eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -114,9 +114,11 @@ public class SessionVariable implements Serializable, Writable { // max ms to wait transaction publish finish when exec insert stmt. public static final String INSERT_VISIBLE_TIMEOUT_MS = "insert_visible_timeout_ms"; + public static final long DEFAULT_INSERT_VISIBLE_TIMEOUT_MS = 10_000; + public static final long MIN_INSERT_VISIBLE_TIMEOUT_MS = 1000; // If user set a very small value, use this value instead. @VariableMgr.VarAttr(name = INSERT_VISIBLE_TIMEOUT_MS) - private long insertVisibleTimeoutMs = -1; + private long insertVisibleTimeoutMs = DEFAULT_INSERT_VISIBLE_TIMEOUT_MS; // max memory used on every backend. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) @@ -551,11 +553,19 @@ public void setShowHiddenColumns(boolean showHiddenColumns) { public boolean isAllowPartitionColumnNullable() { return allowPartitionColumnNullable; } public long getInsertVisibleTimeoutMs() { - return insertVisibleTimeoutMs; + if (insertVisibleTimeoutMs < MIN_INSERT_VISIBLE_TIMEOUT_MS) { + return MIN_INSERT_VISIBLE_TIMEOUT_MS; + } else { + return insertVisibleTimeoutMs; + } } public void setInsertVisibleTimeoutMs(long insertVisibleTimeoutMs) { - this.insertVisibleTimeoutMs = insertVisibleTimeoutMs; + if (insertVisibleTimeoutMs < MIN_INSERT_VISIBLE_TIMEOUT_MS) { + this.insertVisibleTimeoutMs = MIN_INSERT_VISIBLE_TIMEOUT_MS; + } else { + this.insertVisibleTimeoutMs = insertVisibleTimeoutMs; + } } // Serialize to thrift object diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 3dd271f1693c4f..d2b51ce6e47b8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -897,15 +897,10 @@ private void handleInsertStmt() throws Exception { context.getState().setOk(); return; } - - long visibleTimeoutMs = context.getSessionVariable().getInsertVisibleTimeoutMs(); - if (visibleTimeoutMs < 0) { - visibleTimeoutMs = Config.default_insert_visible_timeout_ms; - } if (Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( insertStmt.getDbObj(), insertStmt.getTransactionId(), TabletCommitInfo.fromThrift(coord.getCommitInfos()), - visibleTimeoutMs)) { + context.getSessionVariable().getInsertVisibleTimeoutMs())) { txnStatus = TransactionStatus.VISIBLE; MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); } else { From f09339912bd5177b6f33ccb9a2f5460cbaa81da5 Mon Sep 17 00:00:00 2001 From: liulijia Date: Thu, 7 Jan 2021 19:11:39 +0800 Subject: [PATCH 3/3] Add insert_visible_timeout_ms to doc. --- docs/en/administrator-guide/variables.md | 8 +++++++- docs/zh-CN/administrator-guide/variables.md | 7 ++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/docs/en/administrator-guide/variables.md b/docs/en/administrator-guide/variables.md index 332811547a4c32..468df826b8cc84 100644 --- a/docs/en/administrator-guide/variables.md +++ b/docs/en/administrator-guide/variables.md @@ -74,6 +74,7 @@ Variables that support both session-level and global-level setting include: * `parallel_fragment_exec_instance_num` * `parallel_exchange_instance_num` * `allow_partition_column_nullable` +* `insert_visible_timeout_ms` Variables that support only global-level setting include: @@ -364,4 +365,9 @@ Note that the comment must start with /*+ and can only follow the SELECT. * `allow_partition_column_nullable` - Whether to allow the partition column to be NULL when creating the table. The default is true, which means NULL is allowed. false means the partition column must be defined as NOT NULL. \ No newline at end of file + Whether to allow the partition column to be NULL when creating the table. The default is true, which means NULL is allowed. false means the partition column must be defined as NOT NULL. + +* `insert_visible_timeout_ms` + + When execute insert statement, doris will wait for the transaction to commit and visible after the import is completed. + This parameter controls the timeout of waiting for transaction to be visible. The default value is 10000, and the minimum value is 1000. \ No newline at end of file diff --git a/docs/zh-CN/administrator-guide/variables.md b/docs/zh-CN/administrator-guide/variables.md index 1b71c065d09e11..00df741e5dd0f3 100644 --- a/docs/zh-CN/administrator-guide/variables.md +++ b/docs/zh-CN/administrator-guide/variables.md @@ -74,6 +74,7 @@ SET GLOBAL exec_mem_limit = 137438953472 * `parallel_fragment_exec_instance_num` * `parallel_exchange_instance_num` * `allow_partition_column_nullable` +* `insert_visible_timeout_ms` 只支持全局生效的变量包括: @@ -363,4 +364,8 @@ SELECT /*+ SET_VAR(query_timeout = 1) */ sleep(3); * `allow_partition_column_nullable` - 建表时是否允许分区列为NULL。默认为true,表示允许为NULL。false 表示分区列必须被定义为NOT NULL \ No newline at end of file + 建表时是否允许分区列为NULL。默认为true,表示允许为NULL。false 表示分区列必须被定义为NOT NULL + +* `insert_visible_timeout_ms` + + 在执行insert语句时,导入动作(查询和插入)完成后,还需要等待事务提交,使数据可见。此参数控制等待数据可见的超时时间,默认为10000,最小为1000。 \ No newline at end of file