Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/exec/broker_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ Status BrokerReader::open() {

//not support
Status BrokerReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length) {
return Status::NotSupported("Not support");
return Status::NotSupported("broker reader doesn't support read_one_message interface");
}

Status BrokerReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ under the License.
[SET (k1 = func(k2))]
[WHERE predicate]
[DELETE ON label=true]
[read_properties]

Explain:
file_path:
Expand Down Expand Up @@ -132,6 +133,35 @@ under the License.
delete_on_predicates:

Only used when merge type is MERGE

read_properties:

Used to specify some special parameters.
Syntax:
[PROPERTIES ("key"="value", ...)]

You can specify the following parameters:

line_delimiter: Used to specify the line delimiter in the load file. The default is `\n`. You can use a combination of multiple characters as the column separator.

fuzzy_parse: Boolean type, true to indicate that parse json schema as the first line, this can make import more faster,but need all key keep the order of first line, default value is false. Only use for json format.

jsonpaths: There are two ways to import json: simple mode and matched mode.
simple mode: it is simple mode without setting the jsonpaths parameter. In this mode, the json data is required to be the object type. For example:
{"k1": 1, "k2": 2, "k3": "hello"}, where k1, k2, k3 are column names.

matched mode: the json data is relatively complex, and the corresponding value needs to be matched through the jsonpaths parameter.

strip_outer_array: Boolean type, true to indicate that json data starts with an array object and flattens objects in the array object, default value is false. For example:
[
{"k1" : 1, "v1" : 2},
{"k1" : 3, "v1" : 4}
]
if strip_outer_array is true, and two rows of data are generated when imported into Doris.

json_root: json_root is a valid JSONPATH string that specifies the root node of the JSON Document. The default value is "".

num_as_string: Boolean type, true means that when parsing the json data, it will be converted into a number type and converted into a string, and then it will be imported without loss of precision.

3. broker_name

Expand Down Expand Up @@ -487,6 +517,18 @@ under the License.
WHERE k1 > 3
)
with BROKER "hdfs" ("username"="user", "password"="pass");

15. Import the data in the json file, and specify format as json, it is judged by the file suffix by default, set parameters for reading data

LOAD LABEL example_db.label9
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
INTO TABLE `my_table`
FORMAT AS "json"
(k1, k2, k3)
properties("fuzzy_parse"="true", "strip_outer_array"="true")
)
WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");

## keyword

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ under the License.
[WHERE predicate]
[DELETE ON label=true]
[ORDER BY source_sequence]
[read_properties]

说明:
file_path:
Expand Down Expand Up @@ -132,6 +133,34 @@ under the License.

只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。

read_properties:

用于指定一些特殊参数。
语法:
[PROPERTIES ("key"="value", ...)]

可以指定如下参数:

line_delimiter: 用于指定导入文件中的换行符,默认为\n。可以使用做多个字符的组合作为换行符。

fuzzy_parse: 布尔类型,为true表示json将以第一行为schema 进行解析,开启这个选项可以提高json 导入效率,但是要求所有json 对象的key的顺序和第一行一致, 默认为false,仅用于json格式。

jsonpaths: 导入json方式分为:简单模式和匹配模式。
简单模式:没有设置jsonpaths参数即为简单模式,这种模式下要求json数据是对象类型,例如:
{"k1":1, "k2":2, "k3":"hello"},其中k1,k2,k3是列名字。
匹配模式:用于json数据相对复杂,需要通过jsonpaths参数匹配对应的value。

strip_outer_array: 布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false。例如:
[
{"k1" : 1, "v1" : 2},
{"k1" : 3, "v1" : 4}
]
当strip_outer_array为true,最后导入到doris中会生成两行数据。

json_root: json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。

num_as_string: 布尔类型,为true表示在解析json数据时会将数字类型转为字符串,然后在确保不会出现精度丢失的情况下进行导入。

3. broker_name

所使用的 broker 名称,可以通过 show broker 命令查看。
Expand Down Expand Up @@ -506,6 +535,18 @@ under the License.
)
with BROKER "hdfs" ("username"="user", "password"="pass");

15. 导入json文件中数据 指定FORMAT为json, 默认是通过文件后缀判断,设置读取数据的参数

LOAD LABEL example_db.label9
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
INTO TABLE `my_table`
FORMAT AS "json"
(k1, k2, k3)
properties("fuzzy_parse"="true", "strip_outer_array"="true")
)
WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");

## keyword

BROKER,LOAD
6 changes: 4 additions & 2 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -1414,9 +1414,10 @@ data_desc ::=
where_clause:whereExpr
delete_on_clause:deleteExpr
sequence_col_clause:sequenceColName
opt_properties:properties
{:
RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat,
columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr, mergeType, deleteExpr, sequenceColName);
columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr, mergeType, deleteExpr, sequenceColName, properties);
:}
| opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName
opt_negative:isNeg
Expand All @@ -1425,9 +1426,10 @@ data_desc ::=
opt_col_mapping_list:colMappingList
where_clause:whereExpr
delete_on_clause:deleteExpr
opt_properties:properties
{:
RESULT = new DataDescription(tableName, partitionNames, srcTableName, isNeg, colMappingList, whereExpr,
mergeType, deleteExpr);
mergeType, deleteExpr, properties);
:}
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public class DataDescription {
private String jsonRoot = "";
private boolean fuzzyParse = false;
private boolean readJsonByLine = false;
private boolean numAsString = false;

private String sequenceCol;

Expand All @@ -137,6 +138,7 @@ public class DataDescription {

private LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND;
private final Expr deleteCondition;
private Map<String, String> properties;

public DataDescription(String tableName,
PartitionNames partitionNames,
Expand All @@ -147,7 +149,7 @@ public DataDescription(String tableName,
boolean isNegative,
List<Expr> columnMappingList) {
this(tableName, partitionNames, filePaths, columns, columnSeparator, fileFormat, null,
isNegative, columnMappingList, null, null, LoadTask.MergeType.APPEND, null, null);
isNegative, columnMappingList, null, null, LoadTask.MergeType.APPEND, null, null, null);
}

public DataDescription(String tableName,
Expand All @@ -163,7 +165,8 @@ public DataDescription(String tableName,
Expr whereExpr,
LoadTask.MergeType mergeType,
Expr deleteCondition,
String sequenceColName) {
String sequenceColName,
Map<String, String> properties) {
this.tableName = tableName;
this.partitionNames = partitionNames;
this.filePaths = filePaths;
Expand All @@ -179,6 +182,7 @@ public DataDescription(String tableName,
this.mergeType = mergeType;
this.deleteCondition = deleteCondition;
this.sequenceCol = sequenceColName;
this.properties = properties;
}

// data from table external_hive_table
Expand All @@ -189,7 +193,8 @@ public DataDescription(String tableName,
List<Expr> columnMappingList,
Expr whereExpr,
LoadTask.MergeType mergeType,
Expr deleteCondition) {
Expr deleteCondition,
Map<String, String> properties) {
this.tableName = tableName;
this.partitionNames = partitionNames;
this.filePaths = null;
Expand All @@ -204,6 +209,7 @@ public DataDescription(String tableName,
this.srcTableName = srcTableName;
this.mergeType = mergeType;
this.deleteCondition = deleteCondition;
this.properties = properties;
}

public static void validateMappingFunction(String functionName, List<String> args,
Expand Down Expand Up @@ -499,12 +505,12 @@ public void setFuzzyParse(boolean fuzzyParse) {
this.fuzzyParse = fuzzyParse;
}

public boolean isReadJsonByLine() {
return readJsonByLine;
public boolean isNumAsString() {
return numAsString;
}

public void setReadJsonByLine(boolean readJsonByLine) {
this.readJsonByLine = readJsonByLine;
public void setNumAsString(boolean numAsString) {
this.numAsString = numAsString;
}

public String getJsonPaths() {
Expand Down Expand Up @@ -755,6 +761,36 @@ private void analyzeSequenceCol(String fullDbName) throws AnalysisException {
}
}

private void analyzeProperties() throws AnalysisException {
Map<String, String> analysisMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
analysisMap.putAll(properties);

if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER)) {
lineDelimiter = new Separator(analysisMap.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER));
lineDelimiter.analyze();
}

if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE)) {
fuzzyParse = Boolean.parseBoolean(analysisMap.get(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE));
}

if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY)) {
stripOuterArray = Boolean.parseBoolean(analysisMap.get(LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY));
}

if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_JSONPATHS)) {
jsonPaths = analysisMap.get(LoadStmt.KEY_IN_PARAM_JSONPATHS);
}

if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_JSONROOT)) {
jsonRoot = analysisMap.get(LoadStmt.KEY_IN_PARAM_JSONROOT);
}

if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_NUM_AS_STRING)) {
numAsString = Boolean.parseBoolean(analysisMap.get(LoadStmt.KEY_IN_PARAM_NUM_AS_STRING));
}
}

private void checkLoadPriv(String fullDbName) throws AnalysisException {
if (Strings.isNullOrEmpty(tableName)) {
throw new AnalysisException("No table name in load statement.");
Expand Down Expand Up @@ -817,6 +853,10 @@ public void analyzeWithoutCheckPriv(String fullDbName) throws AnalysisException
analyzeColumns();
analyzeMultiLoadColumns();
analyzeSequenceCol(fullDbName);

if (properties != null) {
analyzeProperties();
}
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class LoadStmt extends DdlStmt {
public static final String KEY_IN_PARAM_JSONROOT = "json_root";
public static final String KEY_IN_PARAM_STRIP_OUTER_ARRAY = "strip_outer_array";
public static final String KEY_IN_PARAM_FUZZY_PARSE = "fuzzy_parse";
public static final String KEY_IN_PARAM_NUM_AS_STRING = "num_as_string";
public static final String KEY_IN_PARAM_MERGE_TYPE = "merge_type";
public static final String KEY_IN_PARAM_DELETE_CONDITION = "delete";
public static final String KEY_IN_PARAM_FUNCTION_COLUMN = "function_column";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class BrokerFileGroup implements Writable {
private String jsonRoot = "";
private boolean fuzzyParse = true;
private boolean readJsonByLine = false;
private boolean numAsString = false;

// for unit test and edit log persistence
private BrokerFileGroup() {
Expand Down Expand Up @@ -237,7 +238,9 @@ public void parse(Database db, DataDescription dataDescription) throws DdlExcept
jsonPaths = dataDescription.getJsonPaths();
jsonRoot = dataDescription.getJsonRoot();
fuzzyParse = dataDescription.isFuzzyParse();
readJsonByLine = dataDescription.isReadJsonByLine();
// For broker load, we only support reading json format data line by line, so we set readJsonByLine to true here.
readJsonByLine = true;
numAsString = dataDescription.isNumAsString();
}
}

Expand Down Expand Up @@ -357,6 +360,14 @@ public void setReadJsonByLine(boolean readJsonByLine) {
this.readJsonByLine = readJsonByLine;
}

public boolean isNumAsString() {
return numAsString;
}

public void setNumAsString(boolean numAsString) {
this.numAsString = numAsString;
}

public String getJsonPaths() {
return jsonPaths;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ private void processFileGroup(
rangeDesc.setJsonpaths(context.fileGroup.getJsonPaths());
rangeDesc.setJsonRoot(context.fileGroup.getJsonRoot());
rangeDesc.setFuzzyParse(context.fileGroup.isFuzzyParse());
rangeDesc.setNumAsString(context.fileGroup.isNumAsString());
rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine());
}
brokerScanRange(curLocations).addToRanges(rangeDesc);
Expand All @@ -471,6 +472,8 @@ private void processFileGroup(
rangeDesc.setJsonpaths(context.fileGroup.getJsonPaths());
rangeDesc.setJsonRoot(context.fileGroup.getJsonRoot());
rangeDesc.setFuzzyParse(context.fileGroup.isFuzzyParse());
rangeDesc.setNumAsString(context.fileGroup.isNumAsString());
rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine());
}
brokerScanRange(curLocations).addToRanges(rangeDesc);
curFileOffset = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ public DataDescription toDataDesc() throws DdlException {
}
DataDescription dataDescription = new DataDescription(tbl, partitionNames, files, null, columnSeparator,
fileFormat, null, isNegative, null, null, whereExpr, mergeType, deleteCondition,
sequenceColName);
sequenceColName, null);
dataDescription.setColumnDef(colString);
backend = Catalog.getCurrentSystemInfo().getBackend(backendId);
if (backend == null) {
Expand Down
Loading