diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveProperties.java new file mode 100644 index 00000000000000..5ded87e0d235a2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveProperties.java @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.hive.metastore.api.Table; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class HiveProperties { + public static final String PROP_FIELD_DELIMITER = "field.delim"; + public static final String PROP_SEPARATOR_CHAR = "separatorChar"; + public static final String PROP_SERIALIZATION_FORMAT = "serialization.format"; + public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01" + + public static final String PROP_LINE_DELIMITER = "line.delim"; + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String PROP_QUOTE_CHAR = "quoteChar"; + + public static final String PROP_COLLECTION_DELIMITER_HIVE2 = "colelction.delim"; + public static final String PROP_COLLECTION_DELIMITER_HIVE3 = "collection.delim"; + public static final String DEFAULT_COLLECTION_DELIMITER = "\2"; + + public static final String PROP_MAP_KV_DELIMITER = "mapkey.delim"; + public static final String DEFAULT_MAP_KV_DELIMITER = "\003"; + + public static final String PROP_ESCAPE_DELIMITER = "escape.delim"; + public static final String DEFAULT_ESCAPE_DELIMIER = "\\"; + + public static final String PROP_NULL_FORMAT = "serialization.null.format"; + public static final String DEFAULT_NULL_FORMAT = "\\N"; + + public static final Set HIVE_SERDE_PROPERTIES = ImmutableSet.of( + PROP_FIELD_DELIMITER, + PROP_COLLECTION_DELIMITER_HIVE2, + PROP_COLLECTION_DELIMITER_HIVE3, + PROP_SEPARATOR_CHAR, + PROP_SERIALIZATION_FORMAT, + PROP_LINE_DELIMITER, + PROP_QUOTE_CHAR, + PROP_MAP_KV_DELIMITER, + PROP_ESCAPE_DELIMITER, + PROP_NULL_FORMAT + ); + + public static String getFieldDelimiter(Table table) { + // This method is used for text format. + // If you need compatibility with csv format, please use `getColumnSeparator`. + Optional fieldDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_FIELD_DELIMITER); + Optional serFormat = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_SERIALIZATION_FORMAT); + return HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( + DEFAULT_FIELD_DELIMITER, fieldDelim, serFormat)); + } + + public static String getColumnSeparator(Table table) { + Optional fieldDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_FIELD_DELIMITER); + Optional columnSeparator = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_SEPARATOR_CHAR); + Optional serFormat = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_SERIALIZATION_FORMAT); + return HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( + DEFAULT_FIELD_DELIMITER, fieldDelim, columnSeparator, serFormat)); + } + + + public static String getLineDelimiter(Table table) { + Optional lineDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_LINE_DELIMITER); + return HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( + DEFAULT_LINE_DELIMITER, lineDelim)); + } + + public static String getMapKvDelimiter(Table table) { + Optional mapkvDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_MAP_KV_DELIMITER); + return HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( + DEFAULT_MAP_KV_DELIMITER, mapkvDelim)); + } + + public static String getCollectionDelimiter(Table table) { + Optional collectionDelimHive2 = HiveMetaStoreClientHelper.getSerdeProperty(table, + PROP_COLLECTION_DELIMITER_HIVE2); + Optional collectionDelimHive3 = HiveMetaStoreClientHelper.getSerdeProperty(table, + PROP_COLLECTION_DELIMITER_HIVE3); + return HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( + DEFAULT_COLLECTION_DELIMITER, collectionDelimHive2, collectionDelimHive3)); + } + + public static Optional getQuoteChar(Table table) { + Map serdeParams = table.getSd().getSerdeInfo().getParameters(); + if (serdeParams.containsKey(PROP_QUOTE_CHAR)) { + return Optional.of(serdeParams.get(PROP_QUOTE_CHAR)); + } + return Optional.empty(); + } + + public static Optional getEscapeDelimiter(Table table) { + Optional escapeDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_ESCAPE_DELIMITER); + if (escapeDelim.isPresent()) { + String escape = HiveMetaStoreClientHelper.getByte(escapeDelim.get()); + if (escape != null) { + return Optional.of(escape); + } else { + return Optional.of(DEFAULT_ESCAPE_DELIMIER); + } + } + return Optional.empty(); + } + + public static String getNullFormat(Table table) { + Optional nullFormat = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_NULL_FORMAT); + return HiveMetaStoreClientHelper.firstPresentOrDefault(DEFAULT_NULL_FORMAT, nullFormat); + } + + // Set properties to table + public static void setTableProperties(Table table, Map properties) { + HashMap serdeProps = new HashMap<>(); + HashMap tblProps = new HashMap<>(); + + for (String k : properties.keySet()) { + if (HIVE_SERDE_PROPERTIES.contains(k)) { + serdeProps.put(k, properties.get(k)); + } else { + tblProps.put(k, properties.get(k)); + } + } + + if (table.getParameters() == null) { + table.setParameters(tblProps); + } else { + table.getParameters().putAll(tblProps); + } + + if (table.getSd().getSerdeInfo().getParameters() == null) { + table.getSd().getSerdeInfo().setParameters(serdeProps); + } else { + table.getSd().getSerdeInfo().getParameters().putAll(serdeProps); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java index dac5d55e5eef09..56acec782c16af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java @@ -25,6 +25,7 @@ import org.apache.doris.fs.remote.BrokerFileSystem; import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -69,6 +70,8 @@ public final class HiveUtil { public static final String COMPRESSION_KEY = "compression"; public static final Set SUPPORTED_ORC_COMPRESSIONS = ImmutableSet.of("plain", "zlib", "snappy", "zstd"); public static final Set SUPPORTED_PARQUET_COMPRESSIONS = ImmutableSet.of("plain", "snappy", "zstd"); + public static final Set SUPPORTED_TEXT_COMPRESSIONS = + ImmutableSet.of("plain", "gzip", "zstd", "bzip2", "lz4", "snappy"); private HiveUtil() { } @@ -191,7 +194,6 @@ public static Table toHiveTable(HiveTableMetadata hiveTable) { Table table = new Table(); table.setDbName(hiveTable.getDbName()); table.setTableName(hiveTable.getTableName()); - // table.setOwner(""); int createTime = (int) System.currentTimeMillis() * 1000; table.setCreateTime(createTime); table.setLastAccessTime(createTime); @@ -211,10 +213,10 @@ public static Table toHiveTable(HiveTableMetadata hiveTable) { setCompressType(hiveTable, props); // set hive table comment by table properties props.put("comment", hiveTable.getComment()); - table.setParameters(props); if (props.containsKey("owner")) { table.setOwner(props.get("owner")); } + HiveProperties.setTableProperties(table, props); return table; } @@ -232,6 +234,12 @@ private static void setCompressType(HiveTableMetadata hiveTable, Map columns, sd.setBucketCols(bucketCols); sd.setNumBuckets(numBuckets); Map parameters = new HashMap<>(); - parameters.put("tag", "doris external hive talbe"); + parameters.put("tag", "doris external hive table"); sd.setParameters(parameters); return sd; } @@ -266,6 +274,10 @@ private static void setFileFormat(String fileFormat, StorageDescriptor sd) { inputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; outputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"; serDe = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"; + } else if (fileFormat.equalsIgnoreCase("text")) { + inputFormat = "org.apache.hadoop.mapred.TextInputFormat"; + outputFormat = "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"; + serDe = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; } else { throw new IllegalArgumentException("Creating table with an unsupported file format: " + fileFormat); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 0dcf4724a7b1e8..435967cef0e70a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -38,6 +38,7 @@ import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue; import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; import org.apache.doris.datasource.hive.HivePartition; +import org.apache.doris.datasource.hive.HiveProperties; import org.apache.doris.datasource.hive.HiveTransaction; import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; @@ -57,6 +58,7 @@ import lombok.Setter; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -65,7 +67,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -77,26 +78,6 @@ public class HiveScanNode extends FileQueryScanNode { private static final Logger LOG = LogManager.getLogger(HiveScanNode.class); - public static final String PROP_FIELD_DELIMITER = "field.delim"; - public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01" - public static final String PROP_LINE_DELIMITER = "line.delim"; - public static final String DEFAULT_LINE_DELIMITER = "\n"; - public static final String PROP_SEPARATOR_CHAR = "separatorChar"; - public static final String PROP_QUOTE_CHAR = "quoteChar"; - public static final String PROP_SERIALIZATION_FORMAT = "serialization.format"; - - public static final String PROP_COLLECTION_DELIMITER_HIVE2 = "colelction.delim"; - public static final String PROP_COLLECTION_DELIMITER_HIVE3 = "collection.delim"; - public static final String DEFAULT_COLLECTION_DELIMITER = "\2"; - - public static final String PROP_MAP_KV_DELIMITER = "mapkey.delim"; - public static final String DEFAULT_MAP_KV_DELIMITER = "\003"; - - public static final String PROP_ESCAPE_DELIMITER = "escape.delim"; - public static final String DEFAULT_ESCAPE_DELIMIER = "\\"; - public static final String PROP_NULL_FORMAT = "serialization.null.format"; - public static final String DEFAULT_NULL_FORMAT = "\\N"; - protected final HMSExternalTable hmsTable; private HiveTransaction hiveTransaction = null; @@ -431,57 +412,21 @@ protected Map getLocationProperties() throws UserException { @Override protected TFileAttributes getFileAttributes() throws UserException { TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); - + Table table = hmsTable.getRemoteTable(); // 1. set column separator - Optional fieldDelim = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(), - PROP_FIELD_DELIMITER); - Optional serFormat = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(), - PROP_SERIALIZATION_FORMAT); - Optional columnSeparator = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(), - PROP_SEPARATOR_CHAR); - textParams.setColumnSeparator(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( - DEFAULT_FIELD_DELIMITER, fieldDelim, columnSeparator, serFormat))); + textParams.setColumnSeparator(HiveProperties.getColumnSeparator(table)); // 2. set line delimiter - Optional lineDelim = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(), - PROP_LINE_DELIMITER); - textParams.setLineDelimiter(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( - DEFAULT_LINE_DELIMITER, lineDelim))); + textParams.setLineDelimiter(HiveProperties.getLineDelimiter(table)); // 3. set mapkv delimiter - Optional mapkvDelim = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(), - PROP_MAP_KV_DELIMITER); - textParams.setMapkvDelimiter(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( - DEFAULT_MAP_KV_DELIMITER, mapkvDelim))); + textParams.setMapkvDelimiter(HiveProperties.getMapKvDelimiter(table)); // 4. set collection delimiter - Optional collectionDelimHive2 = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(), - PROP_COLLECTION_DELIMITER_HIVE2); - Optional collectionDelimHive3 = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(), - PROP_COLLECTION_DELIMITER_HIVE3); - textParams.setCollectionDelimiter( - HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( - DEFAULT_COLLECTION_DELIMITER, collectionDelimHive2, collectionDelimHive3))); + textParams.setCollectionDelimiter(HiveProperties.getCollectionDelimiter(table)); // 5. set quote char - Map serdeParams = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters(); - if (serdeParams.containsKey(PROP_QUOTE_CHAR)) { - textParams.setEnclose(serdeParams.get(PROP_QUOTE_CHAR).getBytes()[0]); - } + HiveProperties.getQuoteChar(table).ifPresent(d -> textParams.setEnclose(d.getBytes()[0])); // 6. set escape delimiter - Optional escapeDelim = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(), - PROP_ESCAPE_DELIMITER); - if (escapeDelim.isPresent()) { - String escape = HiveMetaStoreClientHelper.getByte( - escapeDelim.get()); - if (escape != null) { - textParams - .setEscape(escape.getBytes()[0]); - } else { - textParams.setEscape(DEFAULT_ESCAPE_DELIMIER.getBytes()[0]); - } - } + HiveProperties.getEscapeDelimiter(table).ifPresent(d -> textParams.setEscape(d.getBytes()[0])); // 7. set null format - Optional nullFormat = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(), - PROP_NULL_FORMAT); - textParams.setNullFormat(HiveMetaStoreClientHelper.firstPresentOrDefault( - DEFAULT_NULL_FORMAT, nullFormat)); + textParams.setNullFormat(HiveProperties.getNullFormat(table)); TFileAttributes fileAttributes = new TFileAttributes(); fileAttributes.setTextParams(textParams); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java index 56ff188f964fe9..4b4e76bea47aca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java @@ -21,12 +21,10 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.planner.ColumnRange; -import org.apache.doris.thrift.TFileAttributes; import org.apache.iceberg.Table; @@ -74,11 +72,6 @@ public TableIf getTargetTable() { return icebergExtTable; } - @Override - public TFileAttributes getFileAttributes() throws UserException { - return new TFileAttributes(); - } - @Override public ExternalCatalog getCatalog() { return icebergExtTable.getCatalog(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java index 5e9860171d0fbe..531f4e4ae3c8d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java @@ -22,14 +22,10 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; -import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.planner.ColumnRange; -import org.apache.doris.thrift.TFileAttributes; -import org.apache.doris.thrift.TFileTextScanRangeParams; import java.util.Map; @@ -70,18 +66,6 @@ public TableIf getTargetTable() { return hmsTable; } - @Override - public TFileAttributes getFileAttributes() throws UserException { - TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); - textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters() - .getOrDefault(HiveScanNode.PROP_FIELD_DELIMITER, HiveScanNode.DEFAULT_FIELD_DELIMITER)); - textParams.setLineDelimiter(HiveScanNode.DEFAULT_LINE_DELIMITER); - TFileAttributes fileAttributes = new TFileAttributes(); - fileAttributes.setTextParams(textParams); - fileAttributes.setHeaderType(""); - return fileAttributes; - } - @Override public ExternalCatalog getCatalog() { return hmsTable.getCatalog(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java index b4b1bf2a805d12..be1ce7521061d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java @@ -21,9 +21,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; -import org.apache.doris.thrift.TFileAttributes; public interface IcebergSource { @@ -33,8 +31,6 @@ public interface IcebergSource { TableIf getTargetTable(); - TFileAttributes getFileAttributes() throws UserException; - ExternalCatalog getCatalog(); String getFileFormat() throws DdlException, MetaNotFoundException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java index 676241d06d871a..d1f8ab411eaa25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java @@ -25,7 +25,7 @@ import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; -import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; +import org.apache.doris.datasource.hive.HiveProperties; import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext; import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext; import org.apache.doris.qe.ConnectContext; @@ -42,7 +42,9 @@ import org.apache.doris.thrift.THiveSerDeProperties; import org.apache.doris.thrift.THiveTableSink; +import com.google.common.base.Strings; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; import java.util.ArrayList; import java.util.HashSet; @@ -52,20 +54,6 @@ import java.util.stream.Collectors; public class HiveTableSink extends BaseExternalTableDataSink { - public static final String PROP_FIELD_DELIMITER = "field.delim"; - public static final String DEFAULT_FIELD_DELIMITER = "\1"; - public static final String PROP_SERIALIZATION_FORMAT = "serialization.format"; - public static final String PROP_LINE_DELIMITER = "line.delim"; - public static final String DEFAULT_LINE_DELIMITER = "\n"; - public static final String PROP_COLLECT_DELIMITER = "collection.delim"; - public static final String DEFAULT_COLLECT_DELIMITER = "\2"; - public static final String PROP_MAPKV_DELIMITER = "mapkv.delim"; - public static final String DEFAULT_MAPKV_DELIMITER = "\3"; - public static final String PROP_ESCAPE_DELIMITER = "escape.delim"; - public static final String DEFAULT_ESCAPE_DELIMIER = "\\"; - public static final String PROP_NULL_FORMAT = "serialization.null.format"; - public static final String DEFAULT_NULL_FORMAT = "\\N"; - private final HMSExternalTable targetTable; private static final HashSet supportedTypes = new HashSet() {{ add(TFileFormatType.FORMAT_CSV_PLAIN); @@ -184,10 +172,13 @@ private void setCompressType(THiveTableSink tSink, TFileFormatType formatType) { compressType = targetTable.getRemoteTable().getParameters().get("parquet.compression"); break; case FORMAT_CSV_PLAIN: - compressType = ConnectContext.get().getSessionVariable().hiveTextCompression(); + compressType = targetTable.getRemoteTable().getParameters().get("text.compression"); + if (Strings.isNullOrEmpty(compressType)) { + compressType = ConnectContext.get().getSessionVariable().hiveTextCompression(); + } break; default: - compressType = "uncompressed"; + compressType = "plain"; break; } tSink.setCompressionType(getTFileCompressType(compressType)); @@ -218,47 +209,19 @@ private void setPartitionValues(THiveTableSink tSink) throws AnalysisException { private void setSerDeProperties(THiveTableSink tSink) { THiveSerDeProperties serDeProperties = new THiveSerDeProperties(); + Table table = targetTable.getRemoteTable(); // 1. set field delimiter - Optional fieldDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(), - PROP_FIELD_DELIMITER); - Optional serFormat = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(), - PROP_SERIALIZATION_FORMAT); - serDeProperties.setFieldDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( - DEFAULT_FIELD_DELIMITER, fieldDelim, serFormat))); + serDeProperties.setFieldDelim(HiveProperties.getFieldDelimiter(table)); // 2. set line delimiter - Optional lineDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(), - PROP_LINE_DELIMITER); - serDeProperties.setLineDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( - DEFAULT_LINE_DELIMITER, lineDelim))); + serDeProperties.setLineDelim(HiveProperties.getLineDelimiter(table)); // 3. set collection delimiter - Optional collectDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(), - PROP_COLLECT_DELIMITER); - serDeProperties - .setCollectionDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( - DEFAULT_COLLECT_DELIMITER, collectDelim))); + serDeProperties.setCollectionDelim(HiveProperties.getCollectionDelimiter(table)); // 4. set mapkv delimiter - Optional mapkvDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(), - PROP_MAPKV_DELIMITER); - serDeProperties.setMapkvDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault( - DEFAULT_MAPKV_DELIMITER, mapkvDelim))); + serDeProperties.setMapkvDelim(HiveProperties.getMapKvDelimiter(table)); // 5. set escape delimiter - Optional escapeDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(), - PROP_ESCAPE_DELIMITER); - if (escapeDelim.isPresent()) { - String escape = HiveMetaStoreClientHelper.getByte( - escapeDelim.get()); - if (escape != null) { - serDeProperties - .setEscapeChar(escape); - } else { - serDeProperties.setEscapeChar(DEFAULT_ESCAPE_DELIMIER); - } - } + HiveProperties.getEscapeDelimiter(table).ifPresent(serDeProperties::setEscapeChar); // 6. set null format - Optional nullFormat = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(), - PROP_NULL_FORMAT); - serDeProperties.setNullFormat(HiveMetaStoreClientHelper.firstPresentOrDefault( - DEFAULT_NULL_FORMAT, nullFormat)); + serDeProperties.setNullFormat(HiveProperties.getNullFormat(table)); tSink.setSerdeProperties(serDeProperties); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index d7a7016c70df28..fcd496ac89b751 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1153,7 +1153,7 @@ public class SessionVariable implements Serializable, Writable { public int sortPhaseNum = 0; @VariableMgr.VarAttr(name = HIVE_TEXT_COMPRESSION, needForward = true) - private String hiveTextCompression = "uncompressed"; + private String hiveTextCompression = "plain"; @VariableMgr.VarAttr(name = READ_CSV_EMPTY_LINE_AS_NULL, needForward = true, description = {"在读取csv文件时是否读取csv的空行为null", @@ -4127,6 +4127,10 @@ public void setParallelResultSink(Boolean enableParallelResultSink) { } public String hiveTextCompression() { + if (hiveTextCompression.equals("uncompressed")) { + // This is for compatibility. + return "plain"; + } return hiveTextCompression; } diff --git a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl_text_format.groovy b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl_text_format.groovy new file mode 100644 index 00000000000000..aaa5b198e69c85 --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl_text_format.groovy @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_ddl_text_format", "p0,external,hive,external_docker,external_docker_hive") { + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String hms_port = context.config.otherConfigs.get("hive3HmsPort") + String hdfs_port = context.config.otherConfigs.get("hive3HdfsPort") + String catalog_name = "test_hive_ddl_text_format" + String table_name = "table_with_pars"; + + sql """drop catalog if exists ${catalog_name};""" + + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + 'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}', + 'use_meta_cache' = 'true' + ); + """ + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """use `default`;""" + + sql """ drop table if exists tb_text """ + sql """ + create table tb_text ( + id int, + `name` string + ) PROPERTIES ( + 'compression'='gzip', + 'file_format'='text', + 'field.delim'='\t', + 'line.delim'='\n', + 'collection.delim'=';', + 'mapkey.delim'=':', + 'serialization.null.format'='\\N' + ); + """ + + String serde = "'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'" + String input_format = "'org.apache.hadoop.mapred.TextInputFormat'" + String output_format = "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'" + String doris_fileformat = "'doris.file_format'='text'" + String filed_delim = "'field.delim'" + String line_delim = "'line.delim'" + String mapkey_delim = "'mapkey.delim'" + + def create_tbl_res = sql """ show create table tb_text """ + String res = create_tbl_res.toString() + logger.info("${res}") + assertTrue(res.containsIgnoreCase("${serde}")) + assertTrue(res.containsIgnoreCase("${input_format}")) + assertTrue(res.containsIgnoreCase("${output_format}")) + assertTrue(res.containsIgnoreCase("${doris_fileformat}")) + assertTrue(res.containsIgnoreCase("${filed_delim}")) + assertTrue(res.containsIgnoreCase("${filed_delim}")) + assertTrue(res.containsIgnoreCase("${line_delim}")) + assertTrue(res.containsIgnoreCase("${mapkey_delim}")) + } +}