From e4ab5e09ca8aba2881df11162f7a1f65d63cd904 Mon Sep 17 00:00:00 2001 From: Tiewei Fang Date: Thu, 29 May 2025 09:59:39 +0800 Subject: [PATCH 1/4] [feat](refactor-param) refactor brokerLoad's code about fileformat (#50882) Issue Number:#50238 Problem Summary: Previously, we refactored the code of the fileFormat attribute (#50225). However, we only added the relevant code without modifying the business code. This pull request modifies the code of the BrokerLoad feature that is related to the fileformat. --- .../org/apache/doris/analysis/CopyStmt.java | 3 +- .../doris/analysis/DataDescription.java | 365 ++++++------------ .../doris/datasource/FileGroupInfo.java | 18 +- .../doris/datasource/LoadScanProvider.java | 48 +-- .../fileformat/ArrowFileFormatProperties.java | 49 +++ .../fileformat/CsvFileFormatProperties.java | 20 +- .../fileformat/FileFormatProperties.java | 10 +- .../fileformat/JsonFileFormatProperties.java | 61 ++- .../apache/doris/load/BrokerFileGroup.java | 113 +----- .../doris/load/loadv2/MysqlLoadManager.java | 43 +-- .../load/loadv2/SparkLoadPendingTask.java | 14 +- .../load/routineload/RoutineLoadJob.java | 4 +- .../org/apache/doris/qe/MultiLoadMgr.java | 18 +- .../doris/analysis/DataDescriptionTest.java | 26 +- .../JsonFileFormatPropertiesTest.java | 2 + .../load/loadv2/SparkLoadPendingTaskTest.java | 4 + 16 files changed, 303 insertions(+), 495 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ArrowFileFormatProperties.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java index 6bd4d3506d1a2b..c47c66a62e7ff3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java @@ -34,6 +34,7 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.property.constants.BosProperties; import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.datasource.property.fileformat.FileFormatProperties; import org.apache.doris.load.loadv2.LoadTask.MergeType; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; @@ -147,11 +148,11 @@ private void analyze(String user, String db, boolean checkAuth) throws AnalysisE getOrigStmt() != null ? getOrigStmt().originStmt : "", copyFromParam.getFileColumns(), copyFromParam.getColumnMappingList(), copyFromParam.getFileFilterExpr()); } + dataDescProperties.put(FileFormatProperties.PROP_COMPRESS_TYPE, copyIntoProperties.getCompression()); dataDescription = new DataDescription(tableName.getTbl(), null, Lists.newArrayList(filePath), copyFromParam.getFileColumns(), separator, fileFormatStr, null, false, copyFromParam.getColumnMappingList(), copyFromParam.getFileFilterExpr(), null, MergeType.APPEND, null, null, dataDescProperties); - dataDescription.setCompressType(StageUtil.parseCompressType(copyIntoProperties.getCompression())); if (!(copyFromParam.getColumnMappingList() == null || copyFromParam.getColumnMappingList().isEmpty())) { dataDescription.setIgnoreCsvRedundantCol(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 65a54691ffac7d..d562fba0172544 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -29,15 +29,16 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.Pair; -import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties; +import org.apache.doris.datasource.property.fileformat.FileFormatProperties; +import org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.LoadTaskInfo; -import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueKeyUpdateMode; @@ -52,7 +53,6 @@ import org.apache.logging.log4j.Logger; import java.io.StringReader; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -99,14 +99,13 @@ public class DataDescription implements InsertStmt.DataDesc { FunctionSet.HLL_HASH, "substitute"); + private static final String DEFAULT_READ_JSON_BY_LINE = "true"; + private final String tableName; private String dbName; private final PartitionNames partitionNames; private final List filePaths; - private final Separator columnSeparator; - private String fileFormat; - private TFileCompressType compressType = TFileCompressType.UNKNOWN; private boolean clientLocal = false; private final boolean isNegative; // column names in the path @@ -122,18 +121,8 @@ public class DataDescription implements InsertStmt.DataDesc { private List fileFieldNames; // Used for mini load private TNetworkAddress beAddr; - private Separator lineDelimiter; private String columnDef; private long backendId; - private boolean stripOuterArray = false; - private String jsonPaths = ""; - private String jsonRoot = ""; - private boolean fuzzyParse = false; - // the default must be true. - // So that for broker load, this is always true, - // and for stream load, it will set on demand. - private boolean readJsonByLine = true; - private boolean numAsString = false; private String sequenceCol; @@ -153,19 +142,19 @@ public class DataDescription implements InsertStmt.DataDesc { private final LoadTask.MergeType mergeType; private final Expr deleteCondition; private final Map properties; - private boolean trimDoubleQuotes = false; private boolean isMysqlLoad = false; - private int skipLines = 0; // use for copy into private boolean ignoreCsvRedundantCol = false; private boolean isAnalyzed = false; - private byte enclose = 0; + TUniqueKeyUpdateMode uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; - private byte escape = 0; + private FileFormatProperties fileFormatProperties; - TUniqueKeyUpdateMode uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; + // This map is used to collect information of file format properties. + // The map should be only used in `constructor` and `analyzeWithoutCheckPriv` method. + private Map analysisMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); public DataDescription(String tableName, PartitionNames partitionNames, @@ -220,10 +209,6 @@ public DataDescription(String tableName, this.partitionNames = partitionNames; this.filePaths = filePaths; this.fileFieldNames = columns; - this.columnSeparator = columnSeparator; - this.lineDelimiter = lineDelimiter; - this.fileFormat = fileFormat; - this.compressType = Util.getFileCompressType(compressType); this.columnsFromPath = columnsFromPath; this.isNegative = isNegative; this.columnMappingList = columnMappingList; @@ -234,6 +219,22 @@ public DataDescription(String tableName, this.deleteCondition = deleteCondition; this.sequenceCol = sequenceColName; this.properties = properties; + if (properties != null) { + this.analysisMap.putAll(properties); + } + // the default value of `read_json_by_line` must be true. + // So that for broker load, this is always true, + // and for stream load, it will set on demand. + putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE, DEFAULT_READ_JSON_BY_LINE); + if (columnSeparator != null) { + putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_COLUMN_SEPARATOR, columnSeparator.getOriSeparator()); + } + if (lineDelimiter != null) { + putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_LINE_DELIMITER, lineDelimiter.getOriSeparator()); + } + putAnalysisMapIfNonNull(FileFormatProperties.PROP_FORMAT, fileFormat); + putAnalysisMapIfNonNull(FileFormatProperties.PROP_COMPRESS_TYPE, compressType); + columnsNameToLowerCase(fileFieldNames); columnsNameToLowerCase(columnsFromPath); } @@ -252,8 +253,6 @@ public DataDescription(String tableName, this.partitionNames = partitionNames; this.filePaths = null; this.fileFieldNames = null; - this.columnSeparator = null; - this.fileFormat = null; this.columnsFromPath = null; this.isNegative = isNegative; this.columnMappingList = columnMappingList; @@ -263,6 +262,13 @@ public DataDescription(String tableName, this.mergeType = mergeType; this.deleteCondition = deleteCondition; this.properties = properties; + if (properties != null) { + this.analysisMap.putAll(properties); + } + // the default value of `read_json_by_line` must be true. + // So that for broker load, this is always true, + // and for stream load, it will set on demand. + putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE, DEFAULT_READ_JSON_BY_LINE); } // data desc for mysql client @@ -282,10 +288,6 @@ public DataDescription(TableName tableName, this.filePaths = Lists.newArrayList(file); this.clientLocal = clientLocal; this.fileFieldNames = columns; - this.columnSeparator = columnSeparator; - this.lineDelimiter = lineDelimiter; - this.skipLines = skipLines; - this.fileFormat = null; this.columnsFromPath = null; this.isNegative = false; this.columnMappingList = columnMappingList; @@ -295,6 +297,20 @@ public DataDescription(TableName tableName, this.mergeType = null; this.deleteCondition = null; this.properties = properties; + if (properties != null) { + this.analysisMap.putAll(properties); + } + // the default value of `read_json_by_line` must be true. + // So that for broker load, this is always true, + // and for stream load, it will set on demand. + putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE, DEFAULT_READ_JSON_BY_LINE); + if (columnSeparator != null) { + putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_COLUMN_SEPARATOR, columnSeparator.getOriSeparator()); + } + if (lineDelimiter != null) { + putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_LINE_DELIMITER, lineDelimiter.getOriSeparator()); + } + putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_SKIP_LINES, String.valueOf(skipLines)); this.isMysqlLoad = true; columnsNameToLowerCase(fileFieldNames); } @@ -312,11 +328,6 @@ public DataDescription(String tableName, LoadTaskInfo taskInfo) { } this.fileFieldNames = taskInfo.getColumnExprDescs().getFileColNames(); - this.columnSeparator = taskInfo.getColumnSeparator(); - this.lineDelimiter = taskInfo.getLineDelimiter(); - this.enclose = taskInfo.getEnclose(); - this.escape = taskInfo.getEscape(); - getFileFormatAndCompressType(taskInfo); this.columnsFromPath = null; this.isNegative = taskInfo.getNegative(); this.columnMappingList = taskInfo.getColumnExprDescs().getColumnMappingList(); @@ -326,58 +337,79 @@ public DataDescription(String tableName, LoadTaskInfo taskInfo) { this.mergeType = taskInfo.getMergeType(); this.deleteCondition = taskInfo.getDeleteCondition(); this.sequenceCol = taskInfo.getSequenceCol(); - this.stripOuterArray = taskInfo.isStripOuterArray(); - this.jsonPaths = taskInfo.getJsonPaths(); - this.jsonRoot = taskInfo.getJsonRoot(); - this.fuzzyParse = taskInfo.isFuzzyParse(); - this.readJsonByLine = taskInfo.isReadJsonByLine(); - this.numAsString = taskInfo.isNumAsString(); + this.properties = Maps.newHashMap(); - this.trimDoubleQuotes = taskInfo.getTrimDoubleQuotes(); - this.skipLines = taskInfo.getSkipLines(); + if (properties != null) { + this.analysisMap.putAll(properties); + } + putAnalysisMapIfNonNull(FileFormatProperties.PROP_FORMAT, getFileFormat(taskInfo)); + if (taskInfo.getCompressType() != null) { + putAnalysisMapIfNonNull(FileFormatProperties.PROP_COMPRESS_TYPE, taskInfo.getCompressType().toString()); + } + if (taskInfo.getColumnSeparator() != null) { + putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_COLUMN_SEPARATOR, + taskInfo.getColumnSeparator().getOriSeparator()); + } + if (taskInfo.getLineDelimiter() != null) { + putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_LINE_DELIMITER, + taskInfo.getLineDelimiter().getOriSeparator()); + } + putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_ENCLOSE, new String(new byte[] {taskInfo.getEnclose()})); + putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_ESCAPE, new String(new byte[] {taskInfo.getEscape()})); + putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_TRIM_DOUBLE_QUOTES, + String.valueOf(taskInfo.getTrimDoubleQuotes())); + putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_SKIP_LINES, + String.valueOf(taskInfo.getSkipLines())); + + putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY, + String.valueOf(taskInfo.isStripOuterArray())); + putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_JSON_PATHS, taskInfo.getJsonPaths()); + putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_JSON_ROOT, taskInfo.getJsonRoot()); + putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_FUZZY_PARSE, String.valueOf(taskInfo.isFuzzyParse())); + putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE, + String.valueOf(taskInfo.isReadJsonByLine())); + putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_NUM_AS_STRING, String.valueOf(taskInfo.isNumAsString())); + this.uniquekeyUpdateMode = taskInfo.getUniqueKeyUpdateMode(); columnsNameToLowerCase(fileFieldNames); } - private void getFileFormatAndCompressType(LoadTaskInfo taskInfo) { + private void putAnalysisMapIfNonNull(String key, String value) { + if (value != null) { + this.analysisMap.put(key, value); + } + } + + private String getFileFormat(LoadTaskInfo taskInfo) { // get file format if (!Strings.isNullOrEmpty(taskInfo.getHeaderType())) { // for "csv_with_name" and "csv_with_name_and_type" - this.fileFormat = taskInfo.getHeaderType(); + return taskInfo.getHeaderType(); } else { TFileFormatType type = taskInfo.getFormatType(); if (Util.isCsvFormat(type)) { // ignore the "compress type" in format, such as FORMAT_CSV_GZ // the compress type is saved in "compressType" - this.fileFormat = "csv"; + return "csv"; } else { switch (type) { case FORMAT_ORC: - this.fileFormat = "orc"; - break; + return "orc"; case FORMAT_PARQUET: - this.fileFormat = "parquet"; - break; + return "parquet"; case FORMAT_JSON: - this.fileFormat = "json"; - break; + return "json"; case FORMAT_TEXT: - this.fileFormat = "hive_text"; - break; + return "hive_text"; case FORMAT_WAL: - this.fileFormat = "wal"; - break; + return "wal"; case FORMAT_ARROW: - this.fileFormat = "arrow"; - break; + return "arrow"; default: - this.fileFormat = "unknown"; - break; + return "unknown"; } } } - // get compress type - this.compressType = taskInfo.getCompressType(); } public static void validateMappingFunction(String functionName, List args, @@ -602,33 +634,10 @@ public List getColumnMappingList() { return columnMappingList; } - public String getFileFormat() { - return fileFormat; - } - - public void setCompressType(TFileCompressType compressType) { - this.compressType = compressType; - } - - public TFileCompressType getCompressType() { - return compressType; - } - public List getColumnsFromPath() { return columnsFromPath; } - public String getColumnSeparator() { - if (columnSeparator == null) { - return null; - } - return columnSeparator.getSeparator(); - } - - public Separator getColumnSeparatorObj() { - return columnSeparator; - } - public boolean isNegative() { return isNegative; } @@ -641,25 +650,6 @@ public void setBeAddr(TNetworkAddress addr) { beAddr = addr; } - public String getLineDelimiter() { - if (lineDelimiter == null) { - return null; - } - return lineDelimiter.getSeparator(); - } - - public Separator getLineDelimiterObj() { - return lineDelimiter; - } - - public byte getEnclose() { - return enclose; - } - - public byte getEscape() { - return escape; - } - public String getSequenceCol() { return sequenceCol; } @@ -688,44 +678,8 @@ public void setBackendId(long backendId) { this.backendId = backendId; } - public boolean isStripOuterArray() { - return stripOuterArray; - } - - public void setStripOuterArray(boolean stripOuterArray) { - this.stripOuterArray = stripOuterArray; - } - - public boolean isFuzzyParse() { - return fuzzyParse; - } - - public void setFuzzyParse(boolean fuzzyParse) { - this.fuzzyParse = fuzzyParse; - } - - public boolean isNumAsString() { - return numAsString; - } - - public void setNumAsString(boolean numAsString) { - this.numAsString = numAsString; - } - - public String getJsonPaths() { - return jsonPaths; - } - - public void setJsonPaths(String jsonPaths) { - this.jsonPaths = jsonPaths; - } - - public String getJsonRoot() { - return jsonRoot; - } - - public void setJsonRoot(String jsonRoot) { - this.jsonRoot = jsonRoot; + public FileFormatProperties getFileFormatProperties() { + return fileFormatProperties; } public Map>> getColumnToHadoopFunction() { @@ -756,22 +710,10 @@ public boolean isLoadFromTable() { return !Strings.isNullOrEmpty(srcTableName); } - public boolean isReadJsonByLine() { - return readJsonByLine; - } - - public boolean getTrimDoubleQuotes() { - return trimDoubleQuotes; - } - public Map getProperties() { return properties; } - public int getSkipLines() { - return skipLines; - } - public boolean getIgnoreCsvRedundantCol() { return ignoreCsvRedundantCol; } @@ -981,60 +923,6 @@ private void analyzeSequenceCol(String fullDbName) throws AnalysisException { } } - private void analyzeProperties() throws AnalysisException { - Map analysisMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - analysisMap.putAll(properties); - - // If lineDelimiter had assigned, do not get it from properties again. - if (lineDelimiter == null && analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER)) { - lineDelimiter = new Separator(analysisMap.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER)); - lineDelimiter.analyze(); - } - - if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE)) { - fuzzyParse = Boolean.parseBoolean(analysisMap.get(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE)); - } - - if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY)) { - stripOuterArray = Boolean.parseBoolean(analysisMap.get(LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY)); - } - - if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_JSONPATHS)) { - jsonPaths = analysisMap.get(LoadStmt.KEY_IN_PARAM_JSONPATHS); - } - - if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_JSONROOT)) { - jsonRoot = analysisMap.get(LoadStmt.KEY_IN_PARAM_JSONROOT); - } - - if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_NUM_AS_STRING)) { - numAsString = Boolean.parseBoolean(analysisMap.get(LoadStmt.KEY_IN_PARAM_NUM_AS_STRING)); - } - - if (analysisMap.containsKey(LoadStmt.KEY_TRIM_DOUBLE_QUOTES)) { - trimDoubleQuotes = Boolean.parseBoolean(analysisMap.get(LoadStmt.KEY_TRIM_DOUBLE_QUOTES)); - } - if (analysisMap.containsKey(LoadStmt.KEY_SKIP_LINES)) { - skipLines = Integer.parseInt(analysisMap.get(LoadStmt.KEY_SKIP_LINES)); - } - if (analysisMap.containsKey(LoadStmt.KEY_ENCLOSE)) { - String encloseProp = analysisMap.get(LoadStmt.KEY_ENCLOSE); - if (encloseProp.length() == 1) { - enclose = encloseProp.getBytes(StandardCharsets.UTF_8)[0]; - } else { - throw new AnalysisException("enclose must be single-char"); - } - } - if (analysisMap.containsKey(LoadStmt.KEY_ESCAPE)) { - String escapeProp = analysisMap.get(LoadStmt.KEY_ESCAPE); - if (escapeProp.length() == 1) { - escape = escapeProp.getBytes(StandardCharsets.UTF_8)[0]; - } else { - throw new AnalysisException("escape must be single-char"); - } - } - } - private void checkLoadPriv(String fullDbName) throws AnalysisException { if (Strings.isNullOrEmpty(tableName)) { throw new AnalysisException("No table name in load statement."); @@ -1063,7 +951,7 @@ private void checkLoadPriv(String fullDbName) throws AnalysisException { // Change all the columns name to lower case, because Doris column is case-insensitive. private void columnsNameToLowerCase(List columns) { - if (columns == null || columns.isEmpty() || "json".equals(this.fileFormat)) { + if (columns == null || columns.isEmpty() || "json".equals(analysisMap.get(FileFormatProperties.PROP_FORMAT))) { return; } for (int i = 0; i < columns.size(); i++) { @@ -1111,15 +999,16 @@ private void checkMergeType() throws AnalysisException { public void analyzeWithoutCheckPriv(String fullDbName) throws AnalysisException { analyzeFilePaths(); - analyzeLoadAttributes(); + if (partitionNames != null) { + partitionNames.analyze(null); + } analyzeColumns(); analyzeMultiLoadColumns(); analyzeSequenceCol(fullDbName); - if (properties != null) { - analyzeProperties(); - } + fileFormatProperties = FileFormatProperties.createFileFormatProperties(analysisMap); + fileFormatProperties.analyzeFileFormatProperties(analysisMap, false); } private void analyzeFilePaths() throws AnalysisException { @@ -1131,36 +1020,6 @@ private void analyzeFilePaths() throws AnalysisException { } } - private void analyzeLoadAttributes() throws AnalysisException { - if (columnSeparator != null) { - columnSeparator.analyze(); - } - - if (lineDelimiter != null) { - lineDelimiter.analyze(); - } - - if (partitionNames != null) { - partitionNames.analyze(null); - } - - // file format - // note(tsy): for historical reason, file format here must be string type rather than TFileFormatType - if (fileFormat != null) { - if (!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_PARQUET) - && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV) - && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES) - && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES) - && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_ORC) - && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_JSON) - && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_WAL) - && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_ARROW) - && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_HIVE_TEXT)) { - throw new AnalysisException("File Format Type " + fileFormat + " is invalid."); - } - } - } - public String toSql() { StringBuilder sb = new StringBuilder(); if (isMysqlLoad) { @@ -1188,14 +1047,20 @@ public String apply(String s) { sb.append(" "); sb.append(partitionNames.toSql()); } - if (columnSeparator != null) { - sb.append(" COLUMNS TERMINATED BY ").append(columnSeparator.toSql()); + if (analysisMap.get(CsvFileFormatProperties.PROP_COLUMN_SEPARATOR) != null) { + sb.append(" COLUMNS TERMINATED BY ") + .append("'") + .append(analysisMap.get(CsvFileFormatProperties.PROP_COLUMN_SEPARATOR)) + .append("'"); } - if (lineDelimiter != null && isMysqlLoad) { - sb.append(" LINES TERMINATED BY ").append(lineDelimiter.toSql()); + if (analysisMap.get(CsvFileFormatProperties.PROP_LINE_DELIMITER) != null && isMysqlLoad) { + sb.append(" LINES TERMINATED BY ") + .append("'") + .append(analysisMap.get(CsvFileFormatProperties.PROP_LINE_DELIMITER)) + .append("'"); } - if (fileFormat != null && !fileFormat.isEmpty()) { - sb.append(" FORMAT AS '" + fileFormat + "'"); + if (!Strings.isNullOrEmpty(analysisMap.get(FileFormatProperties.PROP_FORMAT))) { + sb.append(" FORMAT AS '" + analysisMap.get(FileFormatProperties.PROP_FORMAT) + "'"); } if (fileFieldNames != null && !fileFieldNames.isEmpty()) { sb.append(" ("); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java index 54034ee0f650df..c563794dccbc35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java @@ -226,9 +226,11 @@ public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context // If any of the file is unsplittable, all files will be treated as unsplittable. boolean isSplittable = true; for (TBrokerFileStatus fileStatus : fileStatuses) { - TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); + TFileFormatType formatType = formatType(context.fileGroup.getFileFormatProperties().getFormatName(), + fileStatus.path); TFileCompressType compressType = - Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path); + Util.getOrInferCompressType(context.fileGroup.getFileFormatProperties().getCompressionType(), + fileStatus.path); // Now only support split plain text if (compressType == TFileCompressType.PLAIN && ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) @@ -257,10 +259,12 @@ public void createScanRangeLocationsUnsplittable(FileLoadScanNode.ParamCreateCon TScanRangeLocations locations = newLocations(context.params, brokerDesc, backendPolicy); for (int i : group) { TBrokerFileStatus fileStatus = fileStatuses.get(i); - TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); + TFileFormatType formatType = formatType(context.fileGroup.getFileFormatProperties().getFormatName(), + fileStatus.path); context.params.setFormatType(formatType); TFileCompressType compressType = - Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path); + Util.getOrInferCompressType(context.fileGroup.getFileFormatProperties().getCompressionType(), + fileStatus.path); context.params.setCompressType(compressType); List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, context.fileGroup.getColumnNamesFromPath()); @@ -299,10 +303,12 @@ public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateConte long leftBytes = fileStatus.size - curFileOffset; long tmpBytes = curInstanceBytes + leftBytes; // header_type - TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); + TFileFormatType formatType = formatType(context.fileGroup.getFileFormatProperties().getFormatName(), + fileStatus.path); context.params.setFormatType(formatType); TFileCompressType compressType = - Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path); + Util.getOrInferCompressType(context.fileGroup.getFileFormatProperties().getCompressionType(), + fileStatus.path); context.params.setCompressType(compressType); List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, context.fileGroup.getColumnNamesFromPath()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java index 4ed45cd60f9f5f..a7b05dcdd507cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java @@ -31,7 +31,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.FileFormatConstants; -import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.property.fileformat.FileFormatProperties; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.Load; import org.apache.doris.load.loadv2.LoadTask; @@ -42,7 +42,6 @@ import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFileScanSlotInfo; -import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.THdfsParams; import org.apache.doris.thrift.TScanRangeLocations; @@ -86,10 +85,11 @@ public FileLoadScanNode.ParamCreateContext createContext(Analyzer analyzer) thro ctx.destTupleDescriptor = destTupleDesc; ctx.fileGroup = fileGroupInfo.getFileGroup(); ctx.timezone = analyzer.getTimezone(); + FileFormatProperties fileFormatProperties = fileGroupInfo.getFileGroup().getFileFormatProperties(); TFileScanRangeParams params = new TFileScanRangeParams(); - params.setFormatType(formatType(fileGroupInfo.getFileGroup().getFileFormat())); - params.setCompressType(fileGroupInfo.getFileGroup().getCompressType()); + params.setFormatType(fileFormatProperties.getFileFormatType()); + params.setCompressType(fileFormatProperties.getCompressionType()); params.setStrictMode(fileGroupInfo.isStrictMode()); if (fileGroupInfo.getSequenceMapCol() != null) { params.setSequenceMapCol(fileGroupInfo.getSequenceMapCol()); @@ -100,8 +100,7 @@ public FileLoadScanNode.ParamCreateContext createContext(Analyzer analyzer) thro .getBackendConfigProperties()); params.setHdfsParams(tHdfsParams); } - TFileAttributes fileAttributes = new TFileAttributes(); - setFileAttributes(ctx.fileGroup, fileAttributes); + TFileAttributes fileAttributes = setFileAttributes(ctx.fileGroup); params.setFileAttributes(fileAttributes); params.setFileType(fileGroupInfo.getFileType()); ctx.params = params; @@ -110,24 +109,11 @@ public FileLoadScanNode.ParamCreateContext createContext(Analyzer analyzer) thro return ctx; } - public void setFileAttributes(BrokerFileGroup fileGroup, TFileAttributes fileAttributes) { - TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); - textParams.setColumnSeparator(fileGroup.getColumnSeparator()); - textParams.setLineDelimiter(fileGroup.getLineDelimiter()); - textParams.setEnclose(fileGroup.getEnclose()); - textParams.setEscape(fileGroup.getEscape()); - fileAttributes.setTextParams(textParams); - fileAttributes.setStripOuterArray(fileGroup.isStripOuterArray()); - fileAttributes.setJsonpaths(fileGroup.getJsonPaths()); - fileAttributes.setJsonRoot(fileGroup.getJsonRoot()); - fileAttributes.setNumAsString(fileGroup.isNumAsString()); - fileAttributes.setFuzzyParse(fileGroup.isFuzzyParse()); - fileAttributes.setReadJsonByLine(fileGroup.isReadJsonByLine()); - fileAttributes.setReadByColumnDef(true); - fileAttributes.setHeaderType(getHeaderType(fileGroup.getFileFormat())); - fileAttributes.setTrimDoubleQuotes(fileGroup.getTrimDoubleQuotes()); - fileAttributes.setSkipLines(fileGroup.getSkipLines()); - fileAttributes.setIgnoreCsvRedundantCol(fileGroup.getIgnoreCsvRedundantCol()); + public TFileAttributes setFileAttributes(BrokerFileGroup fileGroup) { + TFileAttributes tFileAttributes = fileGroup.getFileFormatProperties().toTFileAttributes(); + tFileAttributes.setReadByColumnDef(true); + tFileAttributes.setIgnoreCsvRedundantCol(fileGroup.getIgnoreCsvRedundantCol()); + return tFileAttributes; } private String getHeaderType(String formatType) { @@ -219,7 +205,7 @@ private void initColumns(FileLoadScanNode.ParamCreateContext context, Analyzer a List srcSlotIds = Lists.newArrayList(); Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs, context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer, context.srcTupleDescriptor, context.srcSlotDescByName, srcSlotIds, - formatType(context.fileGroup.getFileFormat()), fileGroupInfo.getHiddenColumns(), + context.fileGroup.getFileFormatProperties().getFileFormatType(), fileGroupInfo.getHiddenColumns(), fileGroupInfo.getUniqueKeyUpdateMode()); int columnCountFromPath = 0; @@ -250,18 +236,6 @@ private boolean shouldAddSequenceColumn(LoadTaskInfo.ImportColumnDescs columnDes .equalsIgnoreCase(Column.DELETE_SIGN); } - private TFileFormatType formatType(String fileFormat) throws UserException { - if (fileFormat == null) { - // get file format by the file path - return TFileFormatType.FORMAT_CSV_PLAIN; - } - TFileFormatType formatType = Util.getFileFormatTypeFromName(fileFormat); - if (formatType == TFileFormatType.FORMAT_UNKNOWN) { - throw new UserException("Not supported file format: " + fileFormat); - } - return formatType; - } - public TableIf getTargetTable() { return fileGroupInfo.getTargetTable(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ArrowFileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ArrowFileFormatProperties.java new file mode 100644 index 00000000000000..feb99b28f8c50a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ArrowFileFormatProperties.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TResultFileSinkOptions; + +import java.util.Map; + +public class ArrowFileFormatProperties extends FileFormatProperties { + public ArrowFileFormatProperties() { + super(TFileFormatType.FORMAT_ARROW, FileFormatProperties.FORMAT_ARROW); + } + + @Override + public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) { + } + + @Override + public TFileAttributes toTFileAttributes() { + TFileAttributes fileAttributes = new TFileAttributes(); + TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams(); + fileAttributes.setTextParams(fileTextScanRangeParams); + return fileAttributes; + } + + @Override + public void analyzeFileFormatProperties(Map formatProperties, boolean isRemoveOriginProperty) + throws AnalysisException { + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java index 9d2ec68d9a7d45..ac03b14853997d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java @@ -18,7 +18,6 @@ package org.apache.doris.datasource.property.fileformat; import org.apache.doris.analysis.Separator; -import org.apache.doris.catalog.Column; import org.apache.doris.common.util.Util; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.qe.ConnectContext; @@ -28,11 +27,9 @@ import org.apache.doris.thrift.TResultFileSinkOptions; import com.google.common.base.Strings; -import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.List; import java.util.Map; public class CsvFileFormatProperties extends FileFormatProperties { @@ -59,13 +56,8 @@ public class CsvFileFormatProperties extends FileFormatProperties { private boolean trimDoubleQuotes; private int skipLines; private byte enclose; - private byte escape; - // used by tvf - // User specified csv columns, it will override columns got from file - private final List csvSchema = Lists.newArrayList(); - String defaultColumnSeparator = DEFAULT_COLUMN_SEPARATOR; public CsvFileFormatProperties(String formatName) { @@ -103,9 +95,6 @@ public void analyzeFileFormatProperties(Map formatProperties, bo throw new AnalysisException("enclose should not be longer than one byte."); } enclose = (byte) enclosedString.charAt(0); - if (enclose == 0) { - throw new AnalysisException("enclose should not be byte [0]."); - } } String escapeStr = getOrDefault(formatProperties, PROP_ESCAPE, @@ -149,9 +138,8 @@ public TFileAttributes toTFileAttributes() { TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams(); fileTextScanRangeParams.setColumnSeparator(this.columnSeparator); fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter); - if (this.enclose != 0) { - fileTextScanRangeParams.setEnclose(this.enclose); - } + fileTextScanRangeParams.setEnclose(this.enclose); + fileTextScanRangeParams.setEscape(this.escape); fileAttributes.setTextParams(fileTextScanRangeParams); fileAttributes.setHeaderType(headerType); fileAttributes.setTrimDoubleQuotes(trimDoubleQuotes); @@ -188,8 +176,4 @@ public byte getEnclose() { public byte getEscape() { return escape; } - - public List getCsvSchema() { - return csvSchema; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java index 840ce971e7b0e9..94ceeb40e75bfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java @@ -72,7 +72,10 @@ public abstract void analyzeFileFormatProperties( public abstract TFileAttributes toTFileAttributes(); public static FileFormatProperties createFileFormatProperties(String formatString) { - switch (formatString) { + if (formatString == null) { + throw new AnalysisException("formatString can not be null"); + } + switch (formatString.toLowerCase()) { case FORMAT_CSV: return new CsvFileFormatProperties(formatString); case FORMAT_HIVE_TEXT: @@ -91,6 +94,8 @@ public static FileFormatProperties createFileFormatProperties(String formatStrin return new AvroFileFormatProperties(); case FORMAT_WAL: return new WalFileFormatProperties(); + case FORMAT_ARROW: + return new ArrowFileFormatProperties(); default: throw new AnalysisException("format:" + formatString + " is not supported."); } @@ -98,8 +103,7 @@ public static FileFormatProperties createFileFormatProperties(String formatStrin public static FileFormatProperties createFileFormatProperties(Map formatProperties) throws AnalysisException { - String formatString = formatProperties.getOrDefault(PROP_FORMAT, "") - .toLowerCase(); + String formatString = formatProperties.getOrDefault(PROP_FORMAT, "csv"); return createFileFormatProperties(formatString); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java index 238844bee22388..77f3691b09fc5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.property.fileformat; +import org.apache.doris.analysis.Separator; import org.apache.doris.common.util.Util; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.thrift.TFileAttributes; @@ -24,6 +25,8 @@ import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TResultFileSinkOptions; +import com.google.common.base.Strings; + import java.util.Map; public class JsonFileFormatProperties extends FileFormatProperties { @@ -41,6 +44,7 @@ public class JsonFileFormatProperties extends FileFormatProperties { private boolean readJsonByLine; private boolean numAsString = false; private boolean fuzzyParse = false; + private String lineDelimiter = CsvFileFormatProperties.DEFAULT_LINE_DELIMITER; public JsonFileFormatProperties() { @@ -50,37 +54,48 @@ public JsonFileFormatProperties() { @Override public void analyzeFileFormatProperties(Map formatProperties, boolean isRemoveOriginProperty) throws AnalysisException { - jsonRoot = getOrDefault(formatProperties, PROP_JSON_ROOT, - "", isRemoveOriginProperty); - jsonPaths = getOrDefault(formatProperties, PROP_JSON_PATHS, - "", isRemoveOriginProperty); - readJsonByLine = Boolean.valueOf( - getOrDefault(formatProperties, PROP_READ_JSON_BY_LINE, - "", isRemoveOriginProperty)).booleanValue(); - stripOuterArray = Boolean.valueOf( - getOrDefault(formatProperties, PROP_STRIP_OUTER_ARRAY, - "", isRemoveOriginProperty)).booleanValue(); - numAsString = Boolean.valueOf( - getOrDefault(formatProperties, PROP_NUM_AS_STRING, - "", isRemoveOriginProperty)).booleanValue(); - fuzzyParse = Boolean.valueOf( - getOrDefault(formatProperties, PROP_FUZZY_PARSE, - "", isRemoveOriginProperty)).booleanValue(); - - String compressTypeStr = getOrDefault(formatProperties, PROP_COMPRESS_TYPE, - "UNKNOWN", isRemoveOriginProperty); - compressionType = Util.getFileCompressType(compressTypeStr); + try { + jsonRoot = getOrDefault(formatProperties, PROP_JSON_ROOT, + "", isRemoveOriginProperty); + jsonPaths = getOrDefault(formatProperties, PROP_JSON_PATHS, + "", isRemoveOriginProperty); + readJsonByLine = Boolean.valueOf( + getOrDefault(formatProperties, PROP_READ_JSON_BY_LINE, + "false", isRemoveOriginProperty)).booleanValue(); + stripOuterArray = Boolean.valueOf( + getOrDefault(formatProperties, PROP_STRIP_OUTER_ARRAY, + "", isRemoveOriginProperty)).booleanValue(); + numAsString = Boolean.valueOf( + getOrDefault(formatProperties, PROP_NUM_AS_STRING, + "", isRemoveOriginProperty)).booleanValue(); + fuzzyParse = Boolean.valueOf( + getOrDefault(formatProperties, PROP_FUZZY_PARSE, + "", isRemoveOriginProperty)).booleanValue(); + lineDelimiter = getOrDefault(formatProperties, CsvFileFormatProperties.PROP_LINE_DELIMITER, + CsvFileFormatProperties.DEFAULT_LINE_DELIMITER, isRemoveOriginProperty); + if (Strings.isNullOrEmpty(lineDelimiter)) { + throw new AnalysisException("line_delimiter can not be empty."); + } + lineDelimiter = Separator.convertSeparator(lineDelimiter); + + String compressTypeStr = getOrDefault(formatProperties, PROP_COMPRESS_TYPE, + "UNKNOWN", isRemoveOriginProperty); + compressionType = Util.getFileCompressType(compressTypeStr); + } catch (org.apache.doris.common.AnalysisException e) { + throw new AnalysisException("Analyze file format failed: " + e.getMessage()); + } } @Override public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) { + sinkOptions.setLineDelimiter(lineDelimiter); } @Override public TFileAttributes toTFileAttributes() { TFileAttributes fileAttributes = new TFileAttributes(); TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams(); - fileTextScanRangeParams.setLineDelimiter(CsvFileFormatProperties.DEFAULT_LINE_DELIMITER); + fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter); fileAttributes.setTextParams(fileTextScanRangeParams); fileAttributes.setJsonRoot(jsonRoot); fileAttributes.setJsonpaths(jsonPaths); @@ -114,4 +129,8 @@ public boolean isNumAsString() { public boolean isFuzzyParse() { return fuzzyParse; } + + public String getLineDelimiter() { + return lineDelimiter; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 8605dc59bcc1d4..615fce292631ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -35,8 +35,11 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties; +import org.apache.doris.datasource.property.fileformat.FileFormatProperties; +import org.apache.doris.datasource.property.fileformat.OrcFileFormatProperties; +import org.apache.doris.datasource.property.fileformat.ParquetFileFormatProperties; import org.apache.doris.load.loadv2.LoadTask; -import org.apache.doris.thrift.TFileCompressType; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -63,7 +66,6 @@ public class BrokerFileGroup implements Writable { private String lineDelimiter; // fileFormat may be null, which means format will be decided by file's suffix private String fileFormat; - private TFileCompressType compressType = TFileCompressType.UNKNOWN; private boolean isNegative; private List partitionIds; // can be null, means no partition specified private List filePaths; @@ -91,20 +93,9 @@ public class BrokerFileGroup implements Writable { // load from table private long srcTableId = -1; private boolean isLoadFromTable = false; - - private boolean stripOuterArray = false; - private String jsonPaths = ""; - private String jsonRoot = ""; - private boolean fuzzyParse = true; - private boolean readJsonByLine = false; - private boolean numAsString = false; - private boolean trimDoubleQuotes = false; - private int skipLines; private boolean ignoreCsvRedundantCol = false; - private byte enclose; - - private byte escape; + private FileFormatProperties fileFormatProperties; // for unit test and edit log persistence private BrokerFileGroup() { @@ -180,24 +171,13 @@ public void parse(Database db, DataDescription dataDescription) throws DdlExcept olapTable.readUnlock(); } - lineDelimiter = dataDescription.getLineDelimiter(); - if (lineDelimiter == null) { - lineDelimiter = "\n"; + fileFormatProperties = dataDescription.getFileFormatProperties(); + fileFormat = fileFormatProperties.getFormatName(); + if (fileFormatProperties instanceof CsvFileFormatProperties) { + columnSeparator = ((CsvFileFormatProperties) fileFormatProperties).getColumnSeparator(); + lineDelimiter = ((CsvFileFormatProperties) fileFormatProperties).getLineDelimiter(); } - enclose = dataDescription.getEnclose(); - escape = dataDescription.getEscape(); - - fileFormat = dataDescription.getFileFormat(); - columnSeparator = dataDescription.getColumnSeparator(); - if (columnSeparator == null) { - if (fileFormat != null && fileFormat.equalsIgnoreCase("hive_text")) { - columnSeparator = "\001"; - } else { - columnSeparator = "\t"; - } - } - compressType = dataDescription.getCompressType(); isNegative = dataDescription.isNegative(); // FilePath @@ -227,46 +207,12 @@ public void parse(Database db, DataDescription dataDescription) throws DdlExcept srcTableId = srcTable.getId(); isLoadFromTable = true; } - stripOuterArray = dataDescription.isStripOuterArray(); - jsonPaths = dataDescription.getJsonPaths(); - jsonRoot = dataDescription.getJsonRoot(); - fuzzyParse = dataDescription.isFuzzyParse(); - // ATTN: for broker load, we only support reading json format data line by line, - // so if this is set to false, it must be stream load. - readJsonByLine = dataDescription.isReadJsonByLine(); - numAsString = dataDescription.isNumAsString(); - trimDoubleQuotes = dataDescription.getTrimDoubleQuotes(); - skipLines = dataDescription.getSkipLines(); } public long getTableId() { return tableId; } - public String getColumnSeparator() { - return columnSeparator; - } - - public String getLineDelimiter() { - return lineDelimiter; - } - - public byte getEnclose() { - return enclose; - } - - public byte getEscape() { - return escape; - } - - public String getFileFormat() { - return fileFormat; - } - - public TFileCompressType getCompressType() { - return compressType; - } - public boolean isNegative() { return isNegative; } @@ -339,44 +285,13 @@ public void setFileSize(List fileSize) { this.fileSize = fileSize; } - public boolean isStripOuterArray() { - return stripOuterArray; - } - - public boolean isFuzzyParse() { - return fuzzyParse; - } - - public boolean isReadJsonByLine() { - return readJsonByLine; - } - - public boolean isNumAsString() { - return numAsString; - } - - public String getJsonPaths() { - return jsonPaths; - } - - public String getJsonRoot() { - return jsonRoot; - } - public boolean isBinaryFileFormat() { - if (fileFormat == null) { - // null means default: csv - return false; - } - return fileFormat.equalsIgnoreCase("parquet") || fileFormat.equalsIgnoreCase("orc"); - } - - public boolean getTrimDoubleQuotes() { - return trimDoubleQuotes; + return fileFormatProperties instanceof ParquetFileFormatProperties + || fileFormatProperties instanceof OrcFileFormatProperties; } - public int getSkipLines() { - return skipLines; + public FileFormatProperties getFileFormatProperties() { + return fileFormatProperties; } public boolean getIgnoreCsvRedundantCol() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java index 68dffbfb3e3dfe..1174c9559410af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java @@ -31,6 +31,8 @@ import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.io.ByteBufferNetworkInputStream; +import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties; +import org.apache.doris.datasource.property.fileformat.FileFormatProperties; import org.apache.doris.load.LoadJobRowResult; import org.apache.doris.load.StreamLoadHandler; import org.apache.doris.mysql.MysqlSerializer; @@ -347,6 +349,7 @@ public HttpPut generateRequestForMySqlLoad( httpPut.addHeader("token", token); Map props = desc.getProperties(); + FileFormatProperties fileFormatProperties = desc.getFileFormatProperties(); if (props != null) { // max_filter_ratio if (props.containsKey(LoadStmt.KEY_IN_PARAM_MAX_FILTER_RATIO)) { @@ -378,38 +381,20 @@ public HttpPut generateRequestForMySqlLoad( httpPut.addHeader(LoadStmt.TIMEZONE, timezone); } - // trim quotes - if (props.containsKey(LoadStmt.KEY_TRIM_DOUBLE_QUOTES)) { - String trimQuotes = props.get(LoadStmt.KEY_TRIM_DOUBLE_QUOTES); - httpPut.addHeader(LoadStmt.KEY_TRIM_DOUBLE_QUOTES, trimQuotes); + if (fileFormatProperties instanceof CsvFileFormatProperties) { + CsvFileFormatProperties csvFileFormatProperties = (CsvFileFormatProperties) fileFormatProperties; + httpPut.addHeader(LoadStmt.KEY_TRIM_DOUBLE_QUOTES, + String.valueOf(csvFileFormatProperties.isTrimDoubleQuotes())); + httpPut.addHeader(LoadStmt.KEY_ENCLOSE, new String(new byte[]{csvFileFormatProperties.getEnclose()})); + httpPut.addHeader(LoadStmt.KEY_ESCAPE, new String(new byte[]{csvFileFormatProperties.getEscape()})); } - - // enclose - if (props.containsKey(LoadStmt.KEY_ENCLOSE)) { - String enclose = props.get(LoadStmt.KEY_ENCLOSE); - httpPut.addHeader(LoadStmt.KEY_ENCLOSE, enclose); - } - - //escape - if (props.containsKey(LoadStmt.KEY_ESCAPE)) { - String escape = props.get(LoadStmt.KEY_ESCAPE); - httpPut.addHeader(LoadStmt.KEY_ESCAPE, escape); - } - } - - // skip_lines - if (desc.getSkipLines() != 0) { - httpPut.addHeader(LoadStmt.KEY_SKIP_LINES, Integer.toString(desc.getSkipLines())); - } - - // column_separator - if (desc.getColumnSeparator() != null) { - httpPut.addHeader(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR, desc.getColumnSeparator()); } - // line_delimiter - if (desc.getLineDelimiter() != null) { - httpPut.addHeader(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER, desc.getLineDelimiter()); + if (fileFormatProperties instanceof CsvFileFormatProperties) { + CsvFileFormatProperties csvFileFormatProperties = (CsvFileFormatProperties) fileFormatProperties; + httpPut.addHeader(LoadStmt.KEY_SKIP_LINES, Integer.toString(csvFileFormatProperties.getSkipLines())); + httpPut.addHeader(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR, csvFileFormatProperties.getColumnSeparator()); + httpPut.addHeader(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER, csvFileFormatProperties.getLineDelimiter()); } // columns diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java index 32749fd8a774f0..b26603248d66d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java @@ -45,6 +45,8 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.MetaLockUtils; +import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties; +import org.apache.doris.datasource.property.fileformat.FileFormatProperties; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey; import org.apache.doris.load.FailMsg; @@ -519,13 +521,21 @@ private EtlFileGroup createEtlFileGroup(BrokerFileGroup fileGroup, Set tab } EtlFileGroup etlFileGroup = null; + FileFormatProperties fileFormatProperties = fileGroup.getFileFormatProperties(); if (fileGroup.isLoadFromTable()) { etlFileGroup = new EtlFileGroup(SourceType.HIVE, hiveDbTableName, hiveTableProperties, fileGroup.isNegative(), columnMappings, where, partitionIds); } else { + String columnSeparator = CsvFileFormatProperties.DEFAULT_COLUMN_SEPARATOR; + String lineDelimiter = CsvFileFormatProperties.DEFAULT_LINE_DELIMITER; + if (fileFormatProperties instanceof CsvFileFormatProperties) { + columnSeparator = ((CsvFileFormatProperties) fileFormatProperties).getColumnSeparator(); + lineDelimiter = ((CsvFileFormatProperties) fileFormatProperties).getLineDelimiter(); + } etlFileGroup = new EtlFileGroup(SourceType.FILE, fileGroup.getFilePaths(), fileFieldNames, - fileGroup.getColumnNamesFromPath(), fileGroup.getColumnSeparator(), fileGroup.getLineDelimiter(), - fileGroup.isNegative(), fileGroup.getFileFormat(), columnMappings, where, partitionIds); + fileGroup.getColumnNamesFromPath(), columnSeparator, lineDelimiter, + fileGroup.isNegative(), fileFormatProperties.getFormatName(), + columnMappings, where, partitionIds); } return etlFileGroup; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 463b1dd4559ef3..bffc583969b22c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -387,8 +387,8 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { if (fileFormatProperties instanceof CsvFileFormatProperties) { CsvFileFormatProperties csvFileFormatProperties = (CsvFileFormatProperties) fileFormatProperties; jobProperties.put(FileFormatProperties.PROP_FORMAT, "csv"); - jobProperties.put(LoadStmt.KEY_ENCLOSE, String.valueOf(csvFileFormatProperties.getEnclose())); - jobProperties.put(LoadStmt.KEY_ESCAPE, String.valueOf(csvFileFormatProperties.getEscape())); + jobProperties.put(LoadStmt.KEY_ENCLOSE, new String(new byte[]{csvFileFormatProperties.getEnclose()})); + jobProperties.put(LoadStmt.KEY_ESCAPE, new String(new byte[]{csvFileFormatProperties.getEscape()})); this.enclose = csvFileFormatProperties.getEnclose(); this.escape = csvFileFormatProperties.getEscape(); } else if (fileFormatProperties instanceof JsonFileFormatProperties) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index 63f6de760cb102..53862be97c8e28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -447,10 +447,6 @@ public DataDescription toDataDesc() throws DdlException { String sequenceColName = properties.get(LoadStmt.KEY_IN_PARAM_SEQUENCE_COL); String colString = null; Backend backend = null; - boolean stripOuterArray = false; - String jsonPaths = ""; - String jsonRoot = ""; - boolean fuzzyParse = false; if (properties != null) { colString = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS); String columnSeparatorStr = properties.get(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR); @@ -483,18 +479,10 @@ public DataDescription toDataDesc() throws DdlException { if (properties.get(LoadStmt.KEY_IN_PARAM_DELETE_CONDITION) != null) { deleteCondition = parseWhereExpr(properties.get(LoadStmt.KEY_IN_PARAM_DELETE_CONDITION)); } - if (fileFormat != null && fileFormat.equalsIgnoreCase("json")) { - stripOuterArray = Boolean.valueOf( - properties.getOrDefault(LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY, "false")); - jsonPaths = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONPATHS, ""); - jsonRoot = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONROOT, ""); - fuzzyParse = Boolean.valueOf( - properties.getOrDefault(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE, "false")); - } } DataDescription dataDescription = new DataDescription(tbl, partitionNames, files, null, columnSeparator, fileFormat, null, isNegative, null, null, whereExpr, mergeType, deleteCondition, - sequenceColName, null); + sequenceColName, properties); dataDescription.setColumnDef(colString); backend = Env.getCurrentSystemInfo().getBackend(backendId); if (backend == null) { @@ -503,10 +491,6 @@ public DataDescription toDataDesc() throws DdlException { dataDescription.setBeAddr(new TNetworkAddress(backend.getHost(), backend.getHeartbeatPort())); dataDescription.setFileSize(fileSizes); dataDescription.setBackendId(backendId); - dataDescription.setJsonPaths(jsonPaths); - dataDescription.setJsonRoot(jsonRoot); - dataDescription.setStripOuterArray(stripOuterArray); - dataDescription.setFuzzyParse(fuzzyParse); return dataDescription; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java index b6a0aa2233c5cc..d52b24e1b550bb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java @@ -25,6 +25,8 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties; +import org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.MockedAuth; @@ -116,21 +118,23 @@ public void testNormal() throws AnalysisException { desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), Lists.newArrayList("col1", "col2"), null, null, true, null); desc.analyze("testDb"); + CsvFileFormatProperties csvFileFormatProperties = (CsvFileFormatProperties) desc.getFileFormatProperties(); Assert.assertEquals("APPEND DATA INFILE ('abc.txt') NEGATIVE INTO TABLE testTable (col1, col2)", desc.toString()); Assert.assertEquals("testTable", desc.getTableName()); Assert.assertEquals("[col1, col2]", desc.getFileFieldNames().toString()); Assert.assertEquals("[abc.txt]", desc.getFilePaths().toString()); Assert.assertTrue(desc.isNegative()); - Assert.assertNull(desc.getColumnSeparator()); + Assert.assertEquals("\t", csvFileFormatProperties.getColumnSeparator()); Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1)); desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, false, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null, null); desc.analyze("testDb"); + csvFileFormatProperties = (CsvFileFormatProperties) desc.getFileFormatProperties(); Assert.assertEquals("MERGE DATA INFILE ('abc.txt') INTO TABLE testTable COLUMNS TERMINATED BY ',' FORMAT AS 'csv' (col1, col2) WHERE (1 = 1) DELETE ON (1 = 1)", desc.toString()); Assert.assertEquals("(1 = 1)", desc.getWhereExpr().toSql()); Assert.assertEquals("(1 = 1)", desc.getDeleteCondition().toSql()); - Assert.assertEquals(",", desc.getColumnSeparator()); + Assert.assertEquals(",", csvFileFormatProperties.getColumnSeparator()); desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt", "bcd.txt"), Lists.newArrayList("col1", "col2"), new Separator("\t"), @@ -226,12 +230,13 @@ public void testNormal() throws AnalysisException { null, null, LoadTask.MergeType.APPEND, null, null, properties); desc.analyze("testDb"); - Assert.assertEquals("abc", desc.getLineDelimiter()); - Assert.assertTrue(desc.isFuzzyParse()); - Assert.assertTrue(desc.isStripOuterArray()); - Assert.assertEquals("[\"$.h1.h2.k1\",\"$.h1.h2.v1\",\"$.h1.h2.v2\"]", desc.getJsonPaths()); - Assert.assertEquals("$.RECORDS", desc.getJsonRoot()); - Assert.assertTrue(desc.isNumAsString()); + JsonFileFormatProperties jsonFileFormatProperties = (JsonFileFormatProperties) desc.getFileFormatProperties(); + Assert.assertEquals("abc", jsonFileFormatProperties.getLineDelimiter()); + Assert.assertTrue(jsonFileFormatProperties.isFuzzyParse()); + Assert.assertTrue(jsonFileFormatProperties.isStripOuterArray()); + Assert.assertEquals("[\"$.h1.h2.k1\",\"$.h1.h2.v1\",\"$.h1.h2.v2\"]", jsonFileFormatProperties.getJsonPaths()); + Assert.assertEquals("$.RECORDS", jsonFileFormatProperties.getJsonRoot()); + Assert.assertTrue(jsonFileFormatProperties.isNumAsString()); } @Test(expected = AnalysisException.class) @@ -401,8 +406,9 @@ public void testMysqlLoadData() throws AnalysisException { Assert.assertEquals("p1", desc.getPartitionNames().getPartitionNames().get(0)); Assert.assertEquals("p2", desc.getPartitionNames().getPartitionNames().get(1)); - Assert.assertEquals("040506", desc.getLineDelimiter()); - Assert.assertEquals("010203", desc.getColumnSeparator()); + CsvFileFormatProperties csvFileFormatProperties = (CsvFileFormatProperties) desc.getFileFormatProperties(); + Assert.assertEquals("040506", csvFileFormatProperties.getLineDelimiter()); + Assert.assertEquals("010203", csvFileFormatProperties.getColumnSeparator()); String sql = "DATA LOCAL INFILE 'abc.txt' " + "INTO TABLE testDb1.testTable " + "PARTITIONS (p1, p2) " diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java index f614d3223866f1..a4b78ccd6422c8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java @@ -46,6 +46,8 @@ public void testAnalyzeFileFormatPropertiesEmpty() throws AnalysisException { Assert.assertEquals(false, jsonFileFormatProperties.isReadJsonByLine()); Assert.assertEquals(false, jsonFileFormatProperties.isNumAsString()); Assert.assertEquals(false, jsonFileFormatProperties.isFuzzyParse()); + Assert.assertEquals(CsvFileFormatProperties.DEFAULT_LINE_DELIMITER, + jsonFileFormatProperties.getLineDelimiter()); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java index f02e29271deb06..332088b080f690 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java @@ -96,7 +96,9 @@ public void testExecuteTask(@Injectable SparkLoadJob sparkLoadJob, List brokerFileGroups = Lists.newArrayList(); DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), null, null, null, false, null); + desc.analyzeWithoutCheckPriv(database.getFullName()); BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc); + brokerFileGroup.parse(database, desc); brokerFileGroups.add(brokerFileGroup); BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null); aggKeyToFileGroups.put(aggKey, brokerFileGroups); @@ -190,7 +192,9 @@ public void testRangePartitionHashDistribution(@Injectable SparkLoadJob sparkLoa List brokerFileGroups = Lists.newArrayList(); DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), null, null, null, false, null); + desc.analyzeWithoutCheckPriv(database.getFullName()); BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc); + brokerFileGroup.parse(database, desc); brokerFileGroups.add(brokerFileGroup); BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null); aggKeyToFileGroups.put(aggKey, brokerFileGroups); From ab0ae28351961e286a06dddf1b6e5e6cb2abbf96 Mon Sep 17 00:00:00 2001 From: morningman Date: Sat, 12 Jul 2025 14:07:13 -0700 Subject: [PATCH 2/4] fix --- .../apache/doris/analysis/DataDescription.java | 12 +++++++++--- .../fileformat/CsvFileFormatProperties.java | 11 ++++++++--- .../org/apache/doris/qe/AuditLogHelper.java | 17 +++++++---------- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index d562fba0172544..d33c10fcd04587 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -43,6 +43,7 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueKeyUpdateMode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -156,6 +157,7 @@ public class DataDescription implements InsertStmt.DataDesc { // The map should be only used in `constructor` and `analyzeWithoutCheckPriv` method. private Map analysisMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + @VisibleForTesting public DataDescription(String tableName, PartitionNames partitionNames, List filePaths, @@ -188,7 +190,7 @@ public DataDescription(String tableName, mergeType, deleteCondition, sequenceColName, properties); } - public DataDescription(String tableName, + private DataDescription(String tableName, PartitionNames partitionNames, List filePaths, List columns, @@ -239,7 +241,7 @@ public DataDescription(String tableName, columnsNameToLowerCase(columnsFromPath); } - // data from table external_hive_table + @VisibleForTesting public DataDescription(String tableName, PartitionNames partitionNames, String srcTableName, @@ -271,7 +273,7 @@ public DataDescription(String tableName, putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE, DEFAULT_READ_JSON_BY_LINE); } - // data desc for mysql client + @VisibleForTesting public DataDescription(TableName tableName, PartitionNames partitionNames, String file, @@ -1008,6 +1010,10 @@ public void analyzeWithoutCheckPriv(String fullDbName) throws AnalysisException analyzeSequenceCol(fullDbName); fileFormatProperties = FileFormatProperties.createFileFormatProperties(analysisMap); + if (ConnectContext.get() != null) { + analysisMap.putIfAbsent(CsvFileFormatProperties.PROP_ENABLE_TEXT_VALIDATE_UTF8, + String.valueOf(ConnectContext.get().getSessionVariable().enableTextValidateUtf8)); + } fileFormatProperties.analyzeFileFormatProperties(analysisMap, false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java index ac03b14853997d..812bda4d1c7947 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java @@ -20,7 +20,6 @@ import org.apache.doris.analysis.Separator; import org.apache.doris.common.util.Util; import org.apache.doris.nereids.exceptions.AnalysisException; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileTextScanRangeParams; @@ -50,6 +49,8 @@ public class CsvFileFormatProperties extends FileFormatProperties { public static final String PROP_ENCLOSE = "enclose"; public static final String PROP_ESCAPE = "escape"; + public static final String PROP_ENABLE_TEXT_VALIDATE_UTF8 = "enable_text_validate_utf8"; + private String headerType = ""; private String columnSeparator = DEFAULT_COLUMN_SEPARATOR; private String lineDelimiter = DEFAULT_LINE_DELIMITER; @@ -57,6 +58,7 @@ public class CsvFileFormatProperties extends FileFormatProperties { private int skipLines; private byte enclose; private byte escape; + private boolean enableTextValidateUTF8 = true; String defaultColumnSeparator = DEFAULT_COLUMN_SEPARATOR; @@ -120,6 +122,10 @@ public void analyzeFileFormatProperties(Map formatProperties, bo PROP_COMPRESS_TYPE, "UNKNOWN", isRemoveOriginProperty); compressionType = Util.getFileCompressType(compressTypeStr); + String validateUtf8 = getOrDefault(formatProperties, PROP_ENABLE_TEXT_VALIDATE_UTF8, "true", + isRemoveOriginProperty); + enableTextValidateUTF8 = Boolean.parseBoolean(validateUtf8); + } catch (org.apache.doris.common.AnalysisException e) { throw new AnalysisException(e.getMessage()); } @@ -144,8 +150,7 @@ public TFileAttributes toTFileAttributes() { fileAttributes.setHeaderType(headerType); fileAttributes.setTrimDoubleQuotes(trimDoubleQuotes); fileAttributes.setSkipLines(skipLines); - fileAttributes.setEnableTextValidateUtf8( - ConnectContext.get().getSessionVariable().enableTextValidateUtf8); + fileAttributes.setEnableTextValidateUtf8(enableTextValidateUTF8); return fileAttributes; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index fb8b227b8b7d89..66670f3230d20a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -349,24 +349,21 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme MetricRepo.COUNTER_QUERY_ALL.increase(1L); MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L); } + String physicalClusterName = ""; try { if (Config.isCloudMode()) { cloudCluster = ctx.getCloudCluster(false); + physicalClusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getPhysicalCluster(cloudCluster); + if (!cloudCluster.equals(physicalClusterName)) { + MetricRepo.increaseClusterQueryAll(physicalClusterName); + } } } catch (ComputeGroupException e) { LOG.warn("Failed to get cloud cluster", e); return; } - String physicalClusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) - .getPhysicalCluster(cloudCluster); - if (cloudCluster.equals(physicalClusterName)) { - // not vcg - MetricRepo.increaseClusterQueryAll(cloudCluster); - } else { - // vcg - MetricRepo.increaseClusterQueryAll(cloudCluster); - MetricRepo.increaseClusterQueryAll(physicalClusterName); - } + MetricRepo.increaseClusterQueryAll(cloudCluster); if (!ctx.getState().isInternal()) { if (ctx.getState().getStateType() == MysqlStateType.ERR From c4724abb0a738288e041224dc080e33f70cf6fec Mon Sep 17 00:00:00 2001 From: morningman Date: Sat, 12 Jul 2025 14:47:58 -0700 Subject: [PATCH 3/4] fix --- .../main/java/org/apache/doris/analysis/DataDescription.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index d33c10fcd04587..07e3c62756f81b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -190,7 +190,8 @@ public DataDescription(String tableName, mergeType, deleteCondition, sequenceColName, properties); } - private DataDescription(String tableName, + // Visible for SqlParser.java + public DataDescription(String tableName, PartitionNames partitionNames, List filePaths, List columns, From 9a421d295e85a940b2645ffacfb9337dbc52615f Mon Sep 17 00:00:00 2001 From: morningman Date: Sun, 13 Jul 2025 08:04:07 +0800 Subject: [PATCH 4/4] fix --- .../org/apache/doris/analysis/DataDescription.java | 4 ---- .../fileformat/CsvFileFormatProperties.java | 14 ++++++++++++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 07e3c62756f81b..2dd62cea257f35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -1011,10 +1011,6 @@ public void analyzeWithoutCheckPriv(String fullDbName) throws AnalysisException analyzeSequenceCol(fullDbName); fileFormatProperties = FileFormatProperties.createFileFormatProperties(analysisMap); - if (ConnectContext.get() != null) { - analysisMap.putIfAbsent(CsvFileFormatProperties.PROP_ENABLE_TEXT_VALIDATE_UTF8, - String.valueOf(ConnectContext.get().getSessionVariable().enableTextValidateUtf8)); - } fileFormatProperties.analyzeFileFormatProperties(analysisMap, false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java index 812bda4d1c7947..25bd0c469dbfe4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.Separator; import org.apache.doris.common.util.Util; import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileTextScanRangeParams; @@ -122,9 +123,17 @@ public void analyzeFileFormatProperties(Map formatProperties, bo PROP_COMPRESS_TYPE, "UNKNOWN", isRemoveOriginProperty); compressionType = Util.getFileCompressType(compressTypeStr); - String validateUtf8 = getOrDefault(formatProperties, PROP_ENABLE_TEXT_VALIDATE_UTF8, "true", + // get ENABLE_TEXT_VALIDATE_UTF8 from properties map first, + // if not exist, try getting from session variable, + // if connection context is null, use "true" as default value. + String validateUtf8 = getOrDefault(formatProperties, PROP_ENABLE_TEXT_VALIDATE_UTF8, "", isRemoveOriginProperty); - enableTextValidateUTF8 = Boolean.parseBoolean(validateUtf8); + if (Strings.isNullOrEmpty(validateUtf8)) { + enableTextValidateUTF8 = ConnectContext.get() == null ? true + : ConnectContext.get().getSessionVariable().enableTextValidateUtf8; + } else { + enableTextValidateUTF8 = Boolean.parseBoolean(validateUtf8); + } } catch (org.apache.doris.common.AnalysisException e) { throw new AnalysisException(e.getMessage()); @@ -182,3 +191,4 @@ public byte getEscape() { return escape; } } +