diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index 35e658d446da43..b6ec8df488b179 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -58,7 +58,7 @@ // EXPORT TABLE table_name [PARTITION (name1[, ...])] // TO 'export_target_path' // [PROPERTIES("key"="value")] -// BY BROKER 'broker_name' [( $broker_attrs)] +// WITH BROKER 'broker_name' [( $broker_attrs)] @Getter public class ExportStmt extends StatementBase { public static final String PARALLELISM = "parallelism"; @@ -67,6 +67,7 @@ public class ExportStmt extends StatementBase { private static final String DEFAULT_COLUMN_SEPARATOR = "\t"; private static final String DEFAULT_LINE_DELIMITER = "\n"; private static final String DEFAULT_PARALLELISM = "1"; + private static final Integer DEFAULT_TIMEOUT = 7200; private static final ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() .add(LABEL) @@ -76,6 +77,7 @@ public class ExportStmt extends StatementBase { .add(OutFileClause.PROP_DELETE_EXISTING_FILES) .add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR) .add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER) + .add(PropertyAnalyzer.PROPERTIES_TIMEOUT) .add("format") .build(); @@ -97,6 +99,8 @@ public class ExportStmt extends StatementBase { private Integer parallelism; + private Integer timeout; + private String maxFileSize; private String deleteExistingFiles; private SessionVariable sessionVariables; @@ -118,6 +122,7 @@ public ExportStmt(TableRef tableRef, Expr whereExpr, String path, this.brokerDesc = brokerDesc; this.columnSeparator = DEFAULT_COLUMN_SEPARATOR; this.lineDelimiter = DEFAULT_LINE_DELIMITER; + this.timeout = DEFAULT_TIMEOUT; Optional optionalSessionVariable = Optional.ofNullable( ConnectContext.get().getSessionVariable()); @@ -232,8 +237,10 @@ private void setJob() throws UserException { // set sessions exportJob.setQualifiedUser(this.qualifiedUser); exportJob.setUserIdentity(this.userIdentity); - exportJob.setSessionVariables(this.sessionVariables); - exportJob.setTimeoutSecond(this.sessionVariables.getQueryTimeoutS()); + SessionVariable clonedSessionVariable = VariableMgr.cloneSessionVariable(Optional.ofNullable( + ConnectContext.get().getSessionVariable()).orElse(VariableMgr.getDefaultSessionVariable())); + exportJob.setSessionVariables(clonedSessionVariable); + exportJob.setTimeoutSecond(this.timeout); exportJob.setOrigStmt(this.getOrigStmt()); } @@ -323,6 +330,15 @@ private void checkProperties(Map properties) throws UserExceptio throw new UserException("The value of parallelism is invalid!"); } + // timeout + String timeoutString = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_TIMEOUT, + String.valueOf(DEFAULT_TIMEOUT)); + try { + this.timeout = Integer.parseInt(timeoutString); + } catch (NumberFormatException e) { + throw new UserException("The value of timeout is invalid!"); + } + // max_file_size this.maxFileSize = properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, ""); this.deleteExistingFiles = properties.getOrDefault(OutFileClause.PROP_DELETE_EXISTING_FILES, ""); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java index c7d7c4032ce1f6..b647154dc1165f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java @@ -168,6 +168,7 @@ public void cancel() throws JobException { private AutoCloseConnectContext buildConnectContext() { ConnectContext connectContext = new ConnectContext(); + exportJob.getSessionVariables().setQueryTimeoutS(exportJob.getTimeoutSecond()); connectContext.setSessionVariable(exportJob.getSessionVariables()); connectContext.setEnv(Env.getCurrentEnv()); connectContext.setDatabase(exportJob.getTableName().getDb()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java index 42ae03aaec40d8..f2cd13a4793bf7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java @@ -68,7 +68,7 @@ * EXPORT TABLE table_name [PARTITION (name1[, ...])] * TO 'export_target_path' * [PROPERTIES("key"="value")] - * BY BROKER 'broker_name' [( $broker_attrs)] + * WITH BROKER 'broker_name' [( $broker_attrs)] */ public class ExportCommand extends Command implements ForwardWithSync { public static final String PARALLELISM = "parallelism"; @@ -76,6 +76,8 @@ public class ExportCommand extends Command implements ForwardWithSync { private static final String DEFAULT_COLUMN_SEPARATOR = "\t"; private static final String DEFAULT_LINE_DELIMITER = "\n"; private static final String DEFAULT_PARALLELISM = "1"; + private static final Integer DEFAULT_TIMEOUT = 7200; + private static final ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() .add(LABEL) .add(PARALLELISM) @@ -84,6 +86,7 @@ public class ExportCommand extends Command implements ForwardWithSync { .add(OutFileClause.PROP_DELETE_EXISTING_FILES) .add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR) .add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER) + .add(PropertyAnalyzer.PROPERTIES_TIMEOUT) .add("format") .build(); @@ -305,7 +308,18 @@ private ExportJob generateExportJob(ConnectContext ctx, Map file SessionVariable clonedSessionVariable = VariableMgr.cloneSessionVariable(Optional.ofNullable( ConnectContext.get().getSessionVariable()).orElse(VariableMgr.getDefaultSessionVariable())); exportJob.setSessionVariables(clonedSessionVariable); - exportJob.setTimeoutSecond(clonedSessionVariable.getQueryTimeoutS()); + + // set timeoutSecond + int timeoutSecond; + String timeoutString = fileProperties.getOrDefault(PropertyAnalyzer.PROPERTIES_TIMEOUT, + String.valueOf(DEFAULT_TIMEOUT)); + try { + timeoutSecond = Integer.parseInt(timeoutString); + } catch (NumberFormatException e) { + throw new UserException("The value of timeout is invalid!"); + } + + exportJob.setTimeoutSecond(timeoutSecond); // exportJob generate outfile sql exportJob.generateOutfileLogicalPlans(RelationUtil.getQualifierName(ctx, this.nameParts));