diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index fe9a8d315f2354..b25b07ca2ea111 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -112,6 +112,7 @@ const std::string COLUMNS_KEY = "columns"; const std::string HLL_KEY = "hll"; const std::string COLUMN_SEPARATOR_KEY = "column_separator"; const std::string MAX_FILTER_RATIO_KEY = "max_filter_ratio"; +const std::string STRICT_MODE_KEY = "strict_mode"; const std::string TIMEOUT_KEY = "timeout"; const char* k_100_continue = "100-continue"; @@ -713,6 +714,17 @@ Status MiniLoadAction::_process_put(HttpRequest* req, StreamLoadContext* ctx) { if (ctx->timeout_second != -1) { put_request.__set_timeout(ctx->timeout_second); } + auto strict_mode_it = params.find(STRICT_MODE_KEY); + if (strict_mode_it != params.end()) { + std::string strict_mode_value = strict_mode_it->second; + if (boost::iequals(strict_mode_value, "false")) { + put_request.__set_strictMode(false); + } else if (boost::iequals(strict_mode_value, "true")) { + put_request.__set_strictMode(true); + } else { + return Status::InvalidArgument("Invalid strict mode format. Must be bool type"); + } + } // plan this load TNetworkAddress master_addr = _exec_env->master_info()->network_address; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 42e083ba36391b..38cec25e00e059 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -335,6 +335,15 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* } else { request.__set_negative(false); } + if (!http_req->header(HTTP_STRICT_MODE).empty()) { + if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "false")) { + request.__set_strictMode(false); + } else if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "true")) { + request.__set_strictMode(true); + } else { + return Status::InvalidArgument("Invalid strict mode format. Must be bool type"); + } + } // plan this load TNetworkAddress master_addr = _exec_env->master_info()->network_address; diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index c6fc4b90f29b52..e7e0b8dc2f0e9f 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -32,6 +32,7 @@ static const std::string HTTP_MAX_FILTER_RATIO = "max_filter_ratio"; static const std::string HTTP_TIMEOUT = "timeout"; static const std::string HTTP_PARTITIONS = "partitions"; static const std::string HTTP_NEGATIVE = "negative"; +static const std::string HTTP_STRICT_MODE = "strict_mode"; static const std::string HTTP_100_CONTINUE = "100-continue"; diff --git a/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md index f25b2477d4211c..e791f3325efcaf 100644 --- a/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md @@ -128,6 +128,44 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或 `data_source_properties` 中可以指定消费具体的 Kakfa partition。如果不指定,则默认消费所订阅的 topic 的所有 partition。 注意,当显式的指定了 partition,则导入作业不会再动态的检测 Kafka partition 的变化。如果没有指定,则会根据 kafka partition 的变化,动态调整需要消费的 partition。 + +* strict\_mode + + Routine load 导入可以开启 strict mode 模式。开启方式为在 job\_properties 中增加 ```"strict_mode" = "true"``` 。默认的 strict mode 为开启。 + + strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下: + + 1. 对于列类型转换来说,如果 strict mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。 + + 2. 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。 + + 3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。 + +#### strict mode 与 source data 的导入关系 + +这里以列类型为 TinyInt 来举例 + +>注:当表中的列允许导入空值时 + +|source data | source data example | string to int | strict_mode | result| +|------------|---------------------|-----------------|--------------------|---------| +|空值 | \N | N/A | true or false | NULL| +|not null | aaa or 2000 | NULL | true | invalid data(filtered)| +|not null | aaa | NULL | false | NULL| +|not null | 1 | 1 | true or false | correct data| + +这里以列类型为 Decimal(1,0) 举例 + +>注:当表中的列允许导入空值时 + +|source data | source data example | string to int | strict_mode | result| +|------------|---------------------|-----------------|--------------------|--------| +|空值 | \N | N/A | true or false | NULL| +|not null | aaa | NULL | true | invalid data(filtered)| +|not null | aaa | NULL | false | NULL| +|not null | 1 or 10 | 1 | true or false | correct data| + +> 注意:10 虽然是一个超过范围的值,但是因为其类型符合 decimal的要求,所以 strict mode对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被 strict mode 过滤。 #### 访问 SSL 认证的 Kafka 集群 diff --git a/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md index 550d777cc707fd..d0b7d649cf0d43 100644 --- a/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md @@ -113,7 +113,46 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的 columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = mouth(tmp_c2) 其中 tmp_*是一个占位符,代表的是原始文件中的两个原始列。 ``` + ++ strict\_mode + + Stream load 导入可以开启 strict mode 模式。开启方式为在 HEADER 中声明 ```strict_mode=true``` 。默认的 strict mode 为开启。 + + strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下: + + 1. 对于列类型转换来说,如果 strict mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。 + + 2. 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。 + + 3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。 + +#### strict mode 与 source data 的导入关系 + +这里以列类型为 TinyInt 来举例 + +>注:当表中的列允许导入空值时 + +|source data | source data example | string to int | strict_mode | result| +|------------|---------------------|-----------------|--------------------|---------| +|空值 | \N | N/A | true or false | NULL| +|not null | aaa or 2000 | NULL | true | invalid data(filtered)| +|not null | aaa | NULL | false | NULL| +|not null | 1 | 1 | true or false | correct data| + +这里以列类型为 Decimal(1,0) 举例 + +>注:当表中的列允许导入空值时 + +|source data | source data example | string to int | strict_mode | result| +|------------|---------------------|-----------------|--------------------|--------| +|空值 | \N | N/A | true or false | NULL| +|not null | aaa | NULL | true | invalid data(filtered)| +|not null | aaa | NULL | false | NULL| +|not null | 1 or 10 | 1 | true or false | correct data| + +> 注意:10 虽然是一个超过范围的值,但是因为其类型符合 decimal的要求,所以 strict mode对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被 strict mode 过滤。 + ### 返回结果 由于 Stream load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。 diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/MINI LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/MINI LOAD.md index 71abf0d313cd46..c1f5113ee4fbeb 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/MINI LOAD.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/MINI LOAD.md @@ -60,6 +60,9 @@ hll: 用于指定数据里面和表里面的HLL列的对应关系,表中的列和数据里面指定的列 (如果不指定columns,则数据列面的列也可以是表里面的其它非HLL列)通过","分割 指定多个hll列使用“:”分割,例如: 'hll1,cuid:hll2,device' + + strict_mode: 指定当前导入是否使用严格模式,默认为 true。严格模式下,非空原始数据在列类型转化后结果为 NULL 的会被过滤。 + 指定方式为 'strict_mode=false' NOTE: 1. 此种导入方式当前是在一台机器上完成导入工作,因而不宜进行数据量较大的导入工作。 @@ -99,6 +102,9 @@ curl -u root http://host:port/api/testDb/_load_info?label=123 + 8. 指定非严格模式导入 + curl --location-trusted -u root -T testData http://host:port/api/testDb/testTbl/_load?label=123\&strict_mode=false + ## keyword MINI, LOAD diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index 56ad146528fe34..f618e824ad9b9d 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -111,6 +111,10 @@ 采样窗口内,允许的最大错误行数。必须大于等于0。默认是 0,即不允许有错误行。 采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。 被 where 条件过滤掉的行不算错误行。 + + 4. strict_mode + + 是否开启严格模式,默认为开启。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。指定方式为 "strict_mode" = "true" 5. data_source @@ -207,7 +211,7 @@ ## example - 1. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。 + 1. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。导入任务为严格模式。 CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100), @@ -217,7 +221,8 @@ "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", - "max_batch_size" = "209715200" + "max_batch_size" = "209715200", + "strict_mode" = "false" ) FROM KAFKA ( @@ -227,7 +232,7 @@ "kafka_offsets" = "101,0,0,200" ); - 2. 通过 SSL 认证方式,从 Kafka 集群导入数据。同时设置 client.id 参数。 + 2. 通过 SSL 认证方式,从 Kafka 集群导入数据。同时设置 client.id 参数。导入任务为非严格模式 CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100), @@ -237,7 +242,8 @@ "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", - "max_batch_size" = "209715200" + "max_batch_size" = "209715200", + "strict_mode" = "false" ) FROM KAFKA ( diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index f0309025964c82..7da0c6e9b88cd9 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -41,6 +41,8 @@ 比如指定导入到p1, p2分区,-H "partitions: p1, p2" timeout: 指定导入的超时时间。单位秒。默认是 600 秒。可设置范围为 1 秒 ~ 259200 秒。 + + strict_mode: 用户指定此次导入是否开启严格模式,默认为开启。关闭方式为 -H "strict_mode: false"。 RETURN VALUES 导入完成后,会以Json格式返回这次导入的相关内容。当前包括一下字段 @@ -88,6 +90,10 @@ 7. 导入含有HLL列的表,可以是表中的列或者数据中的列用于生成HLL列 curl --location-trusted -u root -H "columns: k1, k2, v1=hll_hash(k1)" -T testData http://host:port/api/testDb/testTbl/_stream_load + + 8. 导入数据进行严格模式过滤 + curl --location-trusted -u root -H "strict_mode: true" -T testData http://host:port/api/testDb/testTbl/_stream_load + ## keyword STREAM,LOAD diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index f3ffc56cf89391..ede71bc5385b12 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -105,6 +105,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(MAX_BATCH_INTERVAL_SEC_PROPERTY) .add(MAX_BATCH_ROWS_PROPERTY) .add(MAX_BATCH_SIZE_PROPERTY) + .add(LoadStmt.STRICT_MODE) .build(); private static final ImmutableSet KAFKA_PROPERTIES_SET = new ImmutableSet.Builder() @@ -131,6 +132,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { private long maxBatchIntervalS = -1; private long maxBatchRows = -1; private long maxBatchSizeBytes = -1; + private boolean strictMode = true; // kafka related properties private String kafkaBrokerList; @@ -199,6 +201,10 @@ public long getMaxBatchSize() { return maxBatchSizeBytes; } + public boolean isStrictMode() { + return strictMode; + } + public String getKafkaBrokerList() { return kafkaBrokerList; } @@ -308,6 +314,10 @@ private void checkJobProperties() throws AnalysisException { maxBatchSizeBytes = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_SIZE_PROPERTY), RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE, MAX_BATCH_SIZE_PRED, MAX_BATCH_SIZE_PROPERTY + " should between 100MB and 1GB"); + + strictMode = Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.STRICT_MODE), + RoutineLoadJob.DEFAULT_STRICT_MODE, + LoadStmt.STRICT_MODE + " should be a boolean"); } private void checkDataSourceProperties() throws AnalysisException { diff --git a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java index c75809be124636..ba3ade3aadf166 100644 --- a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -31,6 +31,7 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -278,12 +279,14 @@ private void analyzeColumns() throws AnalysisException { } ImportColumnDesc importColumnDesc = new ImportColumnDesc(column, child1); parsedColumnExprList.add(importColumnDesc); - analyzeColumnToHadoopFunction(column, child1); - + if (child1 instanceof FunctionCallExpr) { + analyzeColumnToHadoopFunction(column, child1); + } } } private void analyzeColumnToHadoopFunction(String columnName, Expr child1) throws AnalysisException { + Preconditions.checkState(child1 instanceof FunctionCallExpr); FunctionCallExpr functionCallExpr = (FunctionCallExpr) child1; String functionName = functionCallExpr.getFnName().getFunction(); if (!hadoopSupportFunctionName.contains(functionName.toLowerCase())) { diff --git a/fe/src/main/java/org/apache/doris/common/FeConstants.java b/fe/src/main/java/org/apache/doris/common/FeConstants.java index 95683e20ac3693..1c87c9926f49d9 100644 --- a/fe/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/src/main/java/org/apache/doris/common/FeConstants.java @@ -35,5 +35,5 @@ public class FeConstants { // general model // Current meta data version. Use this version to write journals and image - public static int meta_version = FeMetaVersion.VERSION_58; + public static int meta_version = FeMetaVersion.VERSION_59; } diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index b5bdb2a42d9edd..b997ba259f0853 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -126,4 +126,6 @@ public final class FeMetaVersion { public static final int VERSION_57 = 57; // broker load support function, persist origin stmt in broker load public static final int VERSION_58 = 58; + // support strict mode in routine load and stream load + public static final int VERSION_59 = 59; } diff --git a/fe/src/main/java/org/apache/doris/common/util/Util.java b/fe/src/main/java/org/apache/doris/common/util/Util.java index ed2f85f1e49600..465536879436c2 100644 --- a/fe/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/src/main/java/org/apache/doris/common/util/Util.java @@ -391,5 +391,19 @@ public static long getLongPropertyOrDefault(String valStr, long defaultVal, Pred return result; } + + public static boolean getBooleanPropertyOrDefault(String valStr, boolean defaultVal, String hintMsg) + throws AnalysisException { + if (Strings.isNullOrEmpty(valStr)) { + return defaultVal; + } + + try { + boolean result = Boolean.valueOf(valStr); + return result; + } catch (NumberFormatException e) { + throw new AnalysisException(hintMsg); + } + } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index fd9b79cd85218d..801f63156ac4e8 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.ImportColumnsStmt; +import org.apache.doris.analysis.LoadStmt; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; import org.apache.doris.catalog.Catalog; @@ -33,6 +34,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; @@ -92,6 +94,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl public static final long DEFAULT_MAX_INTERVAL_SECOND = 10; public static final long DEFAULT_MAX_BATCH_ROWS = 200000; public static final long DEFAULT_MAX_BATCH_SIZE = 100 * 1024 * 1024; // 100MB + public static final boolean DEFAULT_STRICT_MODE = true; protected static final String STAR_STRING = "*"; /* @@ -147,6 +150,8 @@ public boolean isFinalState() { // maxErrorNum / (maxBatchRows * 10) = max error rate of routine load job // if current error rate is more then max error rate, the job will be paused protected long maxErrorNum = DEFAULT_MAX_ERROR_NUM; // optional + // include strict mode + protected Map jobProperties = Maps.newHashMap(); /* * The following 3 variables control the max execute time of a single task. @@ -241,6 +246,7 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { if (stmt.getMaxBatchSize() != -1) { this.maxBatchSizeBytes = stmt.getMaxBatchSize(); } + jobProperties.put(LoadStmt.STRICT_MODE, String.valueOf(stmt.isStrictMode())); } private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { @@ -366,6 +372,14 @@ public ColumnSeparator getColumnSeparator() { return columnSeparator; } + public boolean isStrictMode() { + String value = jobProperties.get(LoadStmt.STRICT_MODE); + if (value == null) { + return DEFAULT_STRICT_MODE; + } + return Boolean.valueOf(value); + } + public RoutineLoadProgress getProgress() { return progress; } @@ -1113,6 +1127,11 @@ public void write(DataOutput out) throws IOException { out.writeLong(abortedTaskNum); Text.writeString(out, origStmt); + out.writeInt(jobProperties.size()); + for (Map.Entry entry : jobProperties.entrySet()) { + Text.writeString(out, entry.getKey()); + Text.writeString(out, entry.getValue()); + } } @Override @@ -1160,6 +1179,18 @@ public void readFields(DataInput in) throws IOException { origStmt = Text.readString(in); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_59) { + int size = in.readInt(); + for (int i = 0; i < size; i++) { + String key = Text.readString(in); + String value = Text.readString(in); + jobProperties.put(key, value); + } + } else { + // The behaviors of old broker load could not be changed + jobProperties.put(LoadStmt.STRICT_MODE, Boolean.toString(false)); + } + // parse the origin stmt to get routine load desc SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt))); CreateRoutineLoadStmt stmt = null; diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index b1742753a87b28..736097cb463e3e 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -112,7 +112,7 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) { private Table targetTable; private BrokerDesc brokerDesc; private List fileGroups; - private boolean strictMode; + private boolean strictMode = true; private List> fileStatusesList; // file num diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index ad64c514b895b5..9e9e77e1302c9a 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -118,6 +118,7 @@ public void init(Analyzer analyzer) throws UserException { srcTupleDesc = analyzer.getDescTbl().createTupleDescriptor("StreamLoadScanNode"); TBrokerScanRangeParams params = new TBrokerScanRangeParams(); + params.setStrict_mode(streamLoadTask.isStrictMode()); // parse columns header. this contain map from input column to column of destination table // columns: k1, k2, v1, v2=k1 + k2 @@ -255,7 +256,7 @@ private void finalizeParams() throws UserException { if (expr == null) { SlotDescriptor srcSlotDesc = slotDescByName.get(dstSlotDesc.getColumn().getName()); if (srcSlotDesc != null) { - destSidToSrcSidWithoutTrans.put(srcSlotDesc.getId().asInt(), dstSlotDesc.getId().asInt()); + destSidToSrcSidWithoutTrans.put(dstSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt()); // If dest is allow null, we set source to nullable if (dstSlotDesc.getColumn().isAllowNull()) { srcSlotDesc.setIsNullable(true); diff --git a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java index 57059fec8e4f51..6feb27e417c5f2 100644 --- a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -57,6 +57,7 @@ public class StreamLoadTask { private String partitions; private String path; private boolean negative; + private boolean strictMode = true; private int timeout = Config.stream_load_default_timeout_second; public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) { @@ -106,6 +107,10 @@ public boolean getNegative() { return negative; } + public boolean isStrictMode() { + return strictMode; + } + public int getTimeout() { return timeout; } @@ -143,6 +148,9 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws if (request.isSetTimeout()) { timeout = request.getTimeout(); } + if (request.isSetStrictMode()) { + strictMode = request.isStrictMode(); + } } public static StreamLoadTask fromRoutineLoadJob(RoutineLoadJob routineLoadJob) { @@ -158,6 +166,7 @@ private void setOptionalFromRoutineLoadJob(RoutineLoadJob routineLoadJob) { whereExpr = routineLoadJob.getWhereExpr(); columnSeparator = routineLoadJob.getColumnSeparator(); partitions = routineLoadJob.getPartitions() == null ? null : Joiner.on(",").join(routineLoadJob.getPartitions()); + strictMode = routineLoadJob.isStrictMode(); } private void setColumnToColumnExpr(String columns) throws UserException { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 8e59d191d85e23..33b6a3abe1aa62 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -523,6 +523,7 @@ struct TStreamLoadPutRequest { 16: optional i64 auth_code 17: optional bool negative 18: optional i32 timeout + 19: optional bool strictMode } struct TStreamLoadPutResult {