diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java index 824af6996a97bb..a0f7b2722539f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -48,6 +48,7 @@ import com.google.common.collect.Maps; import io.airlift.concurrent.MoreFutures; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; @@ -88,6 +89,7 @@ public class HMSTransaction implements Transaction { private final Map> tableActions = new HashMap<>(); private final Map, Action>> partitionActions = new HashMap<>(); + private final Map> tableColumns = new HashMap<>(); private final Executor fileSystemExecutor; private HmsCommitter hmsCommitter; @@ -105,7 +107,7 @@ public UncompletedMpuPendingUpload(TS3MPUPendingUpload s3MPUPendingUpload, Strin } } - private Set uncompletedMpuPendingUploads = new HashSet<>(); + private final Set uncompletedMpuPendingUploads = new HashSet<>(); public HMSTransaction(HiveMetadataOps hiveOps, FileSystemProvider fileSystemProvider, Executor fileSystemExecutor) { this.hiveOps = hiveOps; @@ -223,7 +225,7 @@ public void finishInsertTable(String dbName, String tbName) { Maps.newHashMap(), sd.getOutputFormat(), sd.getSerdeInfo().getSerializationLib(), - hiveOps.getClient().getSchema(dbName, tbName) + getTableColumns(dbName, tbName) ); if (updateMode == TUpdateMode.OVERWRITE) { dropPartition(dbName, tbName, hivePartition.getPartitionValues(), true); @@ -378,7 +380,7 @@ private void convertToInsertExistingPartitionAction( partition.getParameters(), sd.getOutputFormat(), sd.getSerdeInfo().getSerializationLib(), - hiveOps.getClient().getSchema(dbName, tbName) + getTableColumns(dbName, tbName) ); partitionActionsForTable.put( @@ -895,6 +897,11 @@ public synchronized Table getTable(String databaseName, String tableName) { throw new RuntimeException("Not Found table: " + databaseName + "." + tableName); } + public synchronized List getTableColumns(String databaseName, String tableName) { + return tableColumns.computeIfAbsent(new DatabaseTableName(databaseName, tableName), + key -> hiveOps.getClient().getSchema(dbName, tbName)); + } + public synchronized void finishChangingExistingTable( ActionType actionType, String databaseName, @@ -1258,7 +1265,7 @@ public void prepareAddPartition(PartitionAndMore partitionAndMore) { Maps.newHashMap(), sd.getOutputFormat(), sd.getSerdeInfo().getSerializationLib(), - hiveOps.getClient().getSchema(dbName, tbName) + getTableColumns(dbName, tbName) ); HivePartitionWithStatistics partitionWithStats =