diff --git a/be/src/exec/broker_reader.cpp b/be/src/exec/broker_reader.cpp index a720704ba41cca..65d601aa69a821 100644 --- a/be/src/exec/broker_reader.cpp +++ b/be/src/exec/broker_reader.cpp @@ -123,7 +123,7 @@ Status BrokerReader::open() { //not support Status BrokerReader::read_one_message(std::unique_ptr* buf, int64_t* length) { - return Status::NotSupported("Not support"); + return Status::NotSupported("broker reader doesn't support read_one_message interface"); } Status BrokerReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) { diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index bb052724544137..90a7fdefe7c920 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -73,6 +73,7 @@ under the License. [SET (k1 = func(k2))] [WHERE predicate] [DELETE ON label=true] + [read_properties] Explain: file_path: @@ -132,6 +133,35 @@ under the License. delete_on_predicates: Only used when merge type is MERGE + + read_properties: + + Used to specify some special parameters. + Syntax: + [PROPERTIES ("key"="value", ...)] + + You can specify the following parameters: + + line_delimiter: Used to specify the line delimiter in the load file. The default is `\n`. You can use a combination of multiple characters as the column separator. + + fuzzy_parse: Boolean type, true to indicate that parse json schema as the first line, this can make import more faster,but need all key keep the order of first line, default value is false. Only use for json format. + + jsonpaths: There are two ways to import json: simple mode and matched mode. + simple mode: it is simple mode without setting the jsonpaths parameter. In this mode, the json data is required to be the object type. For example: + {"k1": 1, "k2": 2, "k3": "hello"}, where k1, k2, k3 are column names. + + matched mode: the json data is relatively complex, and the corresponding value needs to be matched through the jsonpaths parameter. + + strip_outer_array: Boolean type, true to indicate that json data starts with an array object and flattens objects in the array object, default value is false. For example: + [ + {"k1" : 1, "v1" : 2}, + {"k1" : 3, "v1" : 4} + ] + if strip_outer_array is true, and two rows of data are generated when imported into Doris. + + json_root: json_root is a valid JSONPATH string that specifies the root node of the JSON Document. The default value is "". + + num_as_string: Boolean type, true means that when parsing the json data, it will be converted into a number type and converted into a string, and then it will be imported without loss of precision. 3. broker_name @@ -487,6 +517,18 @@ under the License. WHERE k1 > 3 ) with BROKER "hdfs" ("username"="user", "password"="pass"); + + 15. Import the data in the json file, and specify format as json, it is judged by the file suffix by default, set parameters for reading data + + LOAD LABEL example_db.label9 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file") + INTO TABLE `my_table` + FORMAT AS "json" + (k1, k2, k3) + properties("fuzzy_parse"="true", "strip_outer_array"="true") + ) + WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); ## keyword diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index 56dfeb5e99e61b..18240f62a28055 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -73,6 +73,7 @@ under the License. [WHERE predicate] [DELETE ON label=true] [ORDER BY source_sequence] + [read_properties] 说明: file_path: @@ -132,6 +133,34 @@ under the License. 只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。 + read_properties: + + 用于指定一些特殊参数。 + 语法: + [PROPERTIES ("key"="value", ...)] + + 可以指定如下参数: + + line_delimiter: 用于指定导入文件中的换行符,默认为\n。可以使用做多个字符的组合作为换行符。 + + fuzzy_parse: 布尔类型,为true表示json将以第一行为schema 进行解析,开启这个选项可以提高json 导入效率,但是要求所有json 对象的key的顺序和第一行一致, 默认为false,仅用于json格式。 + + jsonpaths: 导入json方式分为:简单模式和匹配模式。 + 简单模式:没有设置jsonpaths参数即为简单模式,这种模式下要求json数据是对象类型,例如: + {"k1":1, "k2":2, "k3":"hello"},其中k1,k2,k3是列名字。 + 匹配模式:用于json数据相对复杂,需要通过jsonpaths参数匹配对应的value。 + + strip_outer_array: 布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false。例如: + [ + {"k1" : 1, "v1" : 2}, + {"k1" : 3, "v1" : 4} + ] + 当strip_outer_array为true,最后导入到doris中会生成两行数据。 + + json_root: json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。 + + num_as_string: 布尔类型,为true表示在解析json数据时会将数字类型转为字符串,然后在确保不会出现精度丢失的情况下进行导入。 + 3. broker_name 所使用的 broker 名称,可以通过 show broker 命令查看。 @@ -506,6 +535,18 @@ under the License. ) with BROKER "hdfs" ("username"="user", "password"="pass"); + 15. 导入json文件中数据 指定FORMAT为json, 默认是通过文件后缀判断,设置读取数据的参数 + + LOAD LABEL example_db.label9 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file") + INTO TABLE `my_table` + FORMAT AS "json" + (k1, k2, k3) + properties("fuzzy_parse"="true", "strip_outer_array"="true") + ) + WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); + ## keyword BROKER,LOAD diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index ff7691332c0d65..b1be0b1df37134 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -1414,9 +1414,10 @@ data_desc ::= where_clause:whereExpr delete_on_clause:deleteExpr sequence_col_clause:sequenceColName + opt_properties:properties {: RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat, - columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr, mergeType, deleteExpr, sequenceColName); + columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr, mergeType, deleteExpr, sequenceColName, properties); :} | opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName opt_negative:isNeg @@ -1425,9 +1426,10 @@ data_desc ::= opt_col_mapping_list:colMappingList where_clause:whereExpr delete_on_clause:deleteExpr + opt_properties:properties {: RESULT = new DataDescription(tableName, partitionNames, srcTableName, isNeg, colMappingList, whereExpr, - mergeType, deleteExpr); + mergeType, deleteExpr, properties); :} ; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index abee1d1b28888f..9f5227571a284e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -120,6 +120,7 @@ public class DataDescription { private String jsonRoot = ""; private boolean fuzzyParse = false; private boolean readJsonByLine = false; + private boolean numAsString = false; private String sequenceCol; @@ -137,6 +138,7 @@ public class DataDescription { private LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; private final Expr deleteCondition; + private Map properties; public DataDescription(String tableName, PartitionNames partitionNames, @@ -147,7 +149,7 @@ public DataDescription(String tableName, boolean isNegative, List columnMappingList) { this(tableName, partitionNames, filePaths, columns, columnSeparator, fileFormat, null, - isNegative, columnMappingList, null, null, LoadTask.MergeType.APPEND, null, null); + isNegative, columnMappingList, null, null, LoadTask.MergeType.APPEND, null, null, null); } public DataDescription(String tableName, @@ -163,7 +165,8 @@ public DataDescription(String tableName, Expr whereExpr, LoadTask.MergeType mergeType, Expr deleteCondition, - String sequenceColName) { + String sequenceColName, + Map properties) { this.tableName = tableName; this.partitionNames = partitionNames; this.filePaths = filePaths; @@ -179,6 +182,7 @@ public DataDescription(String tableName, this.mergeType = mergeType; this.deleteCondition = deleteCondition; this.sequenceCol = sequenceColName; + this.properties = properties; } // data from table external_hive_table @@ -189,7 +193,8 @@ public DataDescription(String tableName, List columnMappingList, Expr whereExpr, LoadTask.MergeType mergeType, - Expr deleteCondition) { + Expr deleteCondition, + Map properties) { this.tableName = tableName; this.partitionNames = partitionNames; this.filePaths = null; @@ -204,6 +209,7 @@ public DataDescription(String tableName, this.srcTableName = srcTableName; this.mergeType = mergeType; this.deleteCondition = deleteCondition; + this.properties = properties; } public static void validateMappingFunction(String functionName, List args, @@ -499,12 +505,12 @@ public void setFuzzyParse(boolean fuzzyParse) { this.fuzzyParse = fuzzyParse; } - public boolean isReadJsonByLine() { - return readJsonByLine; + public boolean isNumAsString() { + return numAsString; } - public void setReadJsonByLine(boolean readJsonByLine) { - this.readJsonByLine = readJsonByLine; + public void setNumAsString(boolean numAsString) { + this.numAsString = numAsString; } public String getJsonPaths() { @@ -755,6 +761,36 @@ private void analyzeSequenceCol(String fullDbName) throws AnalysisException { } } + private void analyzeProperties() throws AnalysisException { + Map analysisMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + analysisMap.putAll(properties); + + if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER)) { + lineDelimiter = new Separator(analysisMap.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER)); + lineDelimiter.analyze(); + } + + if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE)) { + fuzzyParse = Boolean.parseBoolean(analysisMap.get(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE)); + } + + if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY)) { + stripOuterArray = Boolean.parseBoolean(analysisMap.get(LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY)); + } + + if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_JSONPATHS)) { + jsonPaths = analysisMap.get(LoadStmt.KEY_IN_PARAM_JSONPATHS); + } + + if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_JSONROOT)) { + jsonRoot = analysisMap.get(LoadStmt.KEY_IN_PARAM_JSONROOT); + } + + if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_NUM_AS_STRING)) { + numAsString = Boolean.parseBoolean(analysisMap.get(LoadStmt.KEY_IN_PARAM_NUM_AS_STRING)); + } + } + private void checkLoadPriv(String fullDbName) throws AnalysisException { if (Strings.isNullOrEmpty(tableName)) { throw new AnalysisException("No table name in load statement."); @@ -817,6 +853,10 @@ public void analyzeWithoutCheckPriv(String fullDbName) throws AnalysisException analyzeColumns(); analyzeMultiLoadColumns(); analyzeSequenceCol(fullDbName); + + if (properties != null) { + analyzeProperties(); + } } /* diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index 29156183220e01..efcfe5b894a3b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -108,6 +108,7 @@ public class LoadStmt extends DdlStmt { public static final String KEY_IN_PARAM_JSONROOT = "json_root"; public static final String KEY_IN_PARAM_STRIP_OUTER_ARRAY = "strip_outer_array"; public static final String KEY_IN_PARAM_FUZZY_PARSE = "fuzzy_parse"; + public static final String KEY_IN_PARAM_NUM_AS_STRING = "num_as_string"; public static final String KEY_IN_PARAM_MERGE_TYPE = "merge_type"; public static final String KEY_IN_PARAM_DELETE_CONDITION = "delete"; public static final String KEY_IN_PARAM_FUNCTION_COLUMN = "function_column"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 8c177c8b81e498..bc03e6c24e307d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -103,6 +103,7 @@ public class BrokerFileGroup implements Writable { private String jsonRoot = ""; private boolean fuzzyParse = true; private boolean readJsonByLine = false; + private boolean numAsString = false; // for unit test and edit log persistence private BrokerFileGroup() { @@ -237,7 +238,9 @@ public void parse(Database db, DataDescription dataDescription) throws DdlExcept jsonPaths = dataDescription.getJsonPaths(); jsonRoot = dataDescription.getJsonRoot(); fuzzyParse = dataDescription.isFuzzyParse(); - readJsonByLine = dataDescription.isReadJsonByLine(); + // For broker load, we only support reading json format data line by line, so we set readJsonByLine to true here. + readJsonByLine = true; + numAsString = dataDescription.isNumAsString(); } } @@ -357,6 +360,14 @@ public void setReadJsonByLine(boolean readJsonByLine) { this.readJsonByLine = readJsonByLine; } + public boolean isNumAsString() { + return numAsString; + } + + public void setNumAsString(boolean numAsString) { + this.numAsString = numAsString; + } + public String getJsonPaths() { return jsonPaths; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 23c68faf6b4cd9..e3df23e5629c0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -445,6 +445,7 @@ private void processFileGroup( rangeDesc.setJsonpaths(context.fileGroup.getJsonPaths()); rangeDesc.setJsonRoot(context.fileGroup.getJsonRoot()); rangeDesc.setFuzzyParse(context.fileGroup.isFuzzyParse()); + rangeDesc.setNumAsString(context.fileGroup.isNumAsString()); rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine()); } brokerScanRange(curLocations).addToRanges(rangeDesc); @@ -471,6 +472,8 @@ private void processFileGroup( rangeDesc.setJsonpaths(context.fileGroup.getJsonPaths()); rangeDesc.setJsonRoot(context.fileGroup.getJsonRoot()); rangeDesc.setFuzzyParse(context.fileGroup.isFuzzyParse()); + rangeDesc.setNumAsString(context.fileGroup.isNumAsString()); + rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine()); } brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index 7ddd39f64ede6a..db08a57820f095 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -509,7 +509,7 @@ public DataDescription toDataDesc() throws DdlException { } DataDescription dataDescription = new DataDescription(tbl, partitionNames, files, null, columnSeparator, fileFormat, null, isNegative, null, null, whereExpr, mergeType, deleteCondition, - sequenceColName); + sequenceColName, null); dataDescription.setColumnDef(colString); backend = Catalog.getCurrentSystemInfo().getBackend(backendId); if (backend == null) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java index 03f21cc59f5eae..85395ebcf2debd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java @@ -30,12 +30,14 @@ import org.apache.doris.system.SystemInfoService; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.util.List; +import java.util.Map; import mockit.Expectations; import mockit.Injectable; @@ -119,7 +121,7 @@ public void testNormal() throws AnalysisException { Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1)); desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), - Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, false, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null); + Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, false, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null, null); desc.analyze("testDb"); Assert.assertEquals("MERGE DATA INFILE ('abc.txt') INTO TABLE testTable COLUMNS TERMINATED BY ',' (col1, col2) WHERE 1 = 1 DELETE ON 1 = 1", desc.toString()); Assert.assertEquals("1 = 1", desc.getWhereExpr().toSql()); @@ -202,10 +204,30 @@ public void testNormal() throws AnalysisException { new FunctionCallExpr("bitmap_dict", params)); desc = new DataDescription("testTable", new PartitionNames(false, Lists.newArrayList("p1", "p2")), "testHiveTable", false, Lists.newArrayList(predicate), - null, LoadTask.MergeType.APPEND, null); + null, LoadTask.MergeType.APPEND, null, null); desc.analyze("testDb"); sql = "APPEND DATA FROM TABLE testHiveTable INTO TABLE testTable PARTITIONS (p1, p2) SET (`k1` = bitmap_dict(`k2`))"; Assert.assertEquals(sql, desc.toSql()); + + Map properties = Maps.newHashMap(); + properties.put("line_delimiter", "abc"); + properties.put("fuzzy_parse", "true"); + properties.put("strip_outer_array", "true"); + properties.put("jsonpaths", "[\"$.h1.h2.k1\",\"$.h1.h2.v1\",\"$.h1.h2.v2\"]"); + properties.put("json_root", "$.RECORDS"); + properties.put("read_json_by_line", "true"); + properties.put("num_as_string","true"); + desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), + Lists.newArrayList("col1", "col2"), new Separator(","), "json", null, false, null, + null, null, LoadTask.MergeType.APPEND, null, null, properties); + + desc.analyze("testDb"); + Assert.assertEquals("abc", desc.getLineDelimiter()); + Assert.assertTrue(desc.isFuzzyParse()); + Assert.assertTrue(desc.isStripOuterArray()); + Assert.assertEquals("[\"$.h1.h2.k1\",\"$.h1.h2.v1\",\"$.h1.h2.v2\"]", desc.getJsonPaths()); + Assert.assertEquals("$.RECORDS", desc.getJsonRoot()); + Assert.assertTrue(desc.isNumAsString()); } @Test(expected = AnalysisException.class) @@ -220,7 +242,7 @@ public void testNegMerge() throws AnalysisException { Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1)); DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), - Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, true, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null); + Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, true, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null, null); desc.analyze("testDb"); } @@ -312,7 +334,7 @@ public void testAnalyzeColumnsWithDuplicatedColumnMapping(@Injectable BinaryPred public void testAnalyzeSequenceColumnNormal() throws AnalysisException { DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), Lists.newArrayList("k1", "k2", "source_sequence", "v1"), new Separator("\t"), - null, null, false, null, null, null, LoadTask.MergeType.APPEND, null, "source_sequence"); + null, null, false, null, null, null, LoadTask.MergeType.APPEND, null, "source_sequence", null); new Expectations() { { tbl.getName(); @@ -331,7 +353,7 @@ public void testAnalyzeSequenceColumnNormal() throws AnalysisException { public void testAnalyzeSequenceColumnWithoutSourceSequence() throws AnalysisException { DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), Lists.newArrayList("k1", "k2", "v1"), new Separator("\t"), - null, null, false, null, null, null, LoadTask.MergeType.APPEND, null, "source_sequence"); + null, null, false, null, null, null, LoadTask.MergeType.APPEND, null, "source_sequence", null); new Expectations() { { tbl.getName();