Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions be/src/http/action/mini_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ const std::string COLUMNS_KEY = "columns";
const std::string HLL_KEY = "hll";
const std::string COLUMN_SEPARATOR_KEY = "column_separator";
const std::string MAX_FILTER_RATIO_KEY = "max_filter_ratio";
const std::string STRICT_MODE_KEY = "strict_mode";
const std::string TIMEOUT_KEY = "timeout";
const char* k_100_continue = "100-continue";

Expand Down Expand Up @@ -713,6 +714,17 @@ Status MiniLoadAction::_process_put(HttpRequest* req, StreamLoadContext* ctx) {
if (ctx->timeout_second != -1) {
put_request.__set_timeout(ctx->timeout_second);
}
auto strict_mode_it = params.find(STRICT_MODE_KEY);
if (strict_mode_it != params.end()) {
std::string strict_mode_value = strict_mode_it->second;
if (boost::iequals(strict_mode_value, "false")) {
put_request.__set_strictMode(false);
} else if (boost::iequals(strict_mode_value, "true")) {
put_request.__set_strictMode(true);
} else {
return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
}
}

// plan this load
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
Expand Down
9 changes: 9 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,15 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
} else {
request.__set_negative(false);
}
if (!http_req->header(HTTP_STRICT_MODE).empty()) {
if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "false")) {
request.__set_strictMode(false);
} else if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "true")) {
request.__set_strictMode(true);
} else {
return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
}
}

// plan this load
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
Expand Down
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ static const std::string HTTP_MAX_FILTER_RATIO = "max_filter_ratio";
static const std::string HTTP_TIMEOUT = "timeout";
static const std::string HTTP_PARTITIONS = "partitions";
static const std::string HTTP_NEGATIVE = "negative";
static const std::string HTTP_STRICT_MODE = "strict_mode";

static const std::string HTTP_100_CONTINUE = "100-continue";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,44 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
`data_source_properties` 中可以指定消费具体的 Kakfa partition。如果不指定,则默认消费所订阅的 topic 的所有 partition。

注意,当显式的指定了 partition,则导入作业不会再动态的检测 Kafka partition 的变化。如果没有指定,则会根据 kafka partition 的变化,动态调整需要消费的 partition。

* strict\_mode

Routine load 导入可以开启 strict mode 模式。开启方式为在 job\_properties 中增加 ```"strict_mode" = "true"``` 。默认的 strict mode 为开启。

strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:

1. 对于列类型转换来说,如果 strict mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。

2. 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。

3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。

#### strict mode 与 source data 的导入关系

这里以列类型为 TinyInt 来举例

>注:当表中的列允许导入空值时

|source data | source data example | string to int | strict_mode | result|
|------------|---------------------|-----------------|--------------------|---------|
|空值 | \N | N/A | true or false | NULL|
|not null | aaa or 2000 | NULL | true | invalid data(filtered)|
|not null | aaa | NULL | false | NULL|
|not null | 1 | 1 | true or false | correct data|

这里以列类型为 Decimal(1,0) 举例

>注:当表中的列允许导入空值时

|source data | source data example | string to int | strict_mode | result|
|------------|---------------------|-----------------|--------------------|--------|
|空值 | \N | N/A | true or false | NULL|
|not null | aaa | NULL | true | invalid data(filtered)|
|not null | aaa | NULL | false | NULL|
|not null | 1 or 10 | 1 | true or false | correct data|

> 注意:10 虽然是一个超过范围的值,但是因为其类型符合 decimal的要求,所以 strict mode对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被 strict mode 过滤。

#### 访问 SSL 认证的 Kafka 集群

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,46 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的
columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = mouth(tmp_c2)
其中 tmp_*是一个占位符,代表的是原始文件中的两个原始列。
```

+ strict\_mode

Stream load 导入可以开启 strict mode 模式。开启方式为在 HEADER 中声明 ```strict_mode=true``` 。默认的 strict mode 为开启。

strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:

1. 对于列类型转换来说,如果 strict mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。

2. 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。

3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。

#### strict mode 与 source data 的导入关系

这里以列类型为 TinyInt 来举例

>注:当表中的列允许导入空值时

|source data | source data example | string to int | strict_mode | result|
|------------|---------------------|-----------------|--------------------|---------|
|空值 | \N | N/A | true or false | NULL|
|not null | aaa or 2000 | NULL | true | invalid data(filtered)|
|not null | aaa | NULL | false | NULL|
|not null | 1 | 1 | true or false | correct data|

这里以列类型为 Decimal(1,0) 举例

>注:当表中的列允许导入空值时

|source data | source data example | string to int | strict_mode | result|
|------------|---------------------|-----------------|--------------------|--------|
|空值 | \N | N/A | true or false | NULL|
|not null | aaa | NULL | true | invalid data(filtered)|
|not null | aaa | NULL | false | NULL|
|not null | 1 or 10 | 1 | true or false | correct data|

> 注意:10 虽然是一个超过范围的值,但是因为其类型符合 decimal的要求,所以 strict mode对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被 strict mode 过滤。


### 返回结果

由于 Stream load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
hll: 用于指定数据里面和表里面的HLL列的对应关系,表中的列和数据里面指定的列
(如果不指定columns,则数据列面的列也可以是表里面的其它非HLL列)通过","分割
指定多个hll列使用“:”分割,例如: 'hll1,cuid:hll2,device'

strict_mode: 指定当前导入是否使用严格模式,默认为 true。严格模式下,非空原始数据在列类型转化后结果为 NULL 的会被过滤。
指定方式为 'strict_mode=false'

NOTE:
1. 此种导入方式当前是在一台机器上完成导入工作,因而不宜进行数据量较大的导入工作。
Expand Down Expand Up @@ -99,6 +102,9 @@

curl -u root http://host:port/api/testDb/_load_info?label=123

8. 指定非严格模式导入
curl --location-trusted -u root -T testData http://host:port/api/testDb/testTbl/_load?label=123\&strict_mode=false

## keyword
MINI, LOAD

Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@
采样窗口内,允许的最大错误行数。必须大于等于0。默认是 0,即不允许有错误行。
采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。
被 where 条件过滤掉的行不算错误行。

4. strict_mode

是否开启严格模式,默认为开启。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。指定方式为 "strict_mode" = "true"

5. data_source

Expand Down Expand Up @@ -207,7 +211,7 @@

## example

1. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。
1. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。导入任务为严格模式。

CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
Expand All @@ -217,7 +221,8 @@
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
Expand All @@ -227,7 +232,7 @@
"kafka_offsets" = "101,0,0,200"
);

2. 通过 SSL 认证方式,从 Kafka 集群导入数据。同时设置 client.id 参数。
2. 通过 SSL 认证方式,从 Kafka 集群导入数据。同时设置 client.id 参数。导入任务为非严格模式

CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
Expand All @@ -237,7 +242,8 @@
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
比如指定导入到p1, p2分区,-H "partitions: p1, p2"

timeout: 指定导入的超时时间。单位秒。默认是 600 秒。可设置范围为 1 秒 ~ 259200 秒。

strict_mode: 用户指定此次导入是否开启严格模式,默认为开启。关闭方式为 -H "strict_mode: false"。

RETURN VALUES
导入完成后,会以Json格式返回这次导入的相关内容。当前包括一下字段
Expand Down Expand Up @@ -88,6 +90,10 @@

7. 导入含有HLL列的表,可以是表中的列或者数据中的列用于生成HLL列
curl --location-trusted -u root -H "columns: k1, k2, v1=hll_hash(k1)" -T testData http://host:port/api/testDb/testTbl/_stream_load

8. 导入数据进行严格模式过滤
curl --location-trusted -u root -H "strict_mode: true" -T testData http://host:port/api/testDb/testTbl/_stream_load


## keyword
STREAM,LOAD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
.add(MAX_BATCH_INTERVAL_SEC_PROPERTY)
.add(MAX_BATCH_ROWS_PROPERTY)
.add(MAX_BATCH_SIZE_PROPERTY)
.add(LoadStmt.STRICT_MODE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we update our Document about this property?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update.

.build();

private static final ImmutableSet<String> KAFKA_PROPERTIES_SET = new ImmutableSet.Builder<String>()
Expand All @@ -131,6 +132,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private long maxBatchIntervalS = -1;
private long maxBatchRows = -1;
private long maxBatchSizeBytes = -1;
private boolean strictMode = true;

// kafka related properties
private String kafkaBrokerList;
Expand Down Expand Up @@ -199,6 +201,10 @@ public long getMaxBatchSize() {
return maxBatchSizeBytes;
}

public boolean isStrictMode() {
return strictMode;
}

public String getKafkaBrokerList() {
return kafkaBrokerList;
}
Expand Down Expand Up @@ -308,6 +314,10 @@ private void checkJobProperties() throws AnalysisException {
maxBatchSizeBytes = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_SIZE_PROPERTY),
RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE, MAX_BATCH_SIZE_PRED,
MAX_BATCH_SIZE_PROPERTY + " should between 100MB and 1GB");

strictMode = Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.STRICT_MODE),
RoutineLoadJob.DEFAULT_STRICT_MODE,
LoadStmt.STRICT_MODE + " should be a boolean");
}

private void checkDataSourceProperties() throws AnalysisException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -278,12 +279,14 @@ private void analyzeColumns() throws AnalysisException {
}
ImportColumnDesc importColumnDesc = new ImportColumnDesc(column, child1);
parsedColumnExprList.add(importColumnDesc);
analyzeColumnToHadoopFunction(column, child1);

if (child1 instanceof FunctionCallExpr) {
analyzeColumnToHadoopFunction(column, child1);
}
}
}

private void analyzeColumnToHadoopFunction(String columnName, Expr child1) throws AnalysisException {
Preconditions.checkState(child1 instanceof FunctionCallExpr);
FunctionCallExpr functionCallExpr = (FunctionCallExpr) child1;
String functionName = functionCallExpr.getFnName().getFunction();
if (!hadoopSupportFunctionName.contains(functionName.toLowerCase())) {
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/common/FeConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ public class FeConstants {

// general model
// Current meta data version. Use this version to write journals and image
public static int meta_version = FeMetaVersion.VERSION_58;
public static int meta_version = FeMetaVersion.VERSION_59;
}
2 changes: 2 additions & 0 deletions fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,6 @@ public final class FeMetaVersion {
public static final int VERSION_57 = 57;
// broker load support function, persist origin stmt in broker load
public static final int VERSION_58 = 58;
// support strict mode in routine load and stream load
public static final int VERSION_59 = 59;
}
14 changes: 14 additions & 0 deletions fe/src/main/java/org/apache/doris/common/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -391,5 +391,19 @@ public static long getLongPropertyOrDefault(String valStr, long defaultVal, Pred

return result;
}

public static boolean getBooleanPropertyOrDefault(String valStr, boolean defaultVal, String hintMsg)
throws AnalysisException {
if (Strings.isNullOrEmpty(valStr)) {
return defaultVal;
}

try {
boolean result = Boolean.valueOf(valStr);
return result;
} catch (NumberFormatException e) {
throw new AnalysisException(hintMsg);
}
}
}

Loading