Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.google.common.collect.Maps;
import io.airlift.concurrent.MoreFutures;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class HMSTransaction implements Transaction {
private final Map<DatabaseTableName, Action<TableAndMore>> tableActions = new HashMap<>();
private final Map<DatabaseTableName, Map<List<String>, Action<PartitionAndMore>>>
partitionActions = new HashMap<>();
private final Map<DatabaseTableName, List<FieldSchema>> tableColumns = new HashMap<>();

private final Executor fileSystemExecutor;
private HmsCommitter hmsCommitter;
Expand All @@ -105,7 +107,7 @@ public UncompletedMpuPendingUpload(TS3MPUPendingUpload s3MPUPendingUpload, Strin
}
}

private Set<UncompletedMpuPendingUpload> uncompletedMpuPendingUploads = new HashSet<>();
private final Set<UncompletedMpuPendingUpload> uncompletedMpuPendingUploads = new HashSet<>();

public HMSTransaction(HiveMetadataOps hiveOps, FileSystemProvider fileSystemProvider, Executor fileSystemExecutor) {
this.hiveOps = hiveOps;
Expand Down Expand Up @@ -223,7 +225,7 @@ public void finishInsertTable(String dbName, String tbName) {
Maps.newHashMap(),
sd.getOutputFormat(),
sd.getSerdeInfo().getSerializationLib(),
hiveOps.getClient().getSchema(dbName, tbName)
getTableColumns(dbName, tbName)
);
if (updateMode == TUpdateMode.OVERWRITE) {
dropPartition(dbName, tbName, hivePartition.getPartitionValues(), true);
Expand Down Expand Up @@ -378,7 +380,7 @@ private void convertToInsertExistingPartitionAction(
partition.getParameters(),
sd.getOutputFormat(),
sd.getSerdeInfo().getSerializationLib(),
hiveOps.getClient().getSchema(dbName, tbName)
getTableColumns(dbName, tbName)
);

partitionActionsForTable.put(
Expand Down Expand Up @@ -895,6 +897,11 @@ public synchronized Table getTable(String databaseName, String tableName) {
throw new RuntimeException("Not Found table: " + databaseName + "." + tableName);
}

public synchronized List<FieldSchema> getTableColumns(String databaseName, String tableName) {
return tableColumns.computeIfAbsent(new DatabaseTableName(databaseName, tableName),
key -> hiveOps.getClient().getSchema(dbName, tbName));
}

public synchronized void finishChangingExistingTable(
ActionType actionType,
String databaseName,
Expand Down Expand Up @@ -1258,7 +1265,7 @@ public void prepareAddPartition(PartitionAndMore partitionAndMore) {
Maps.newHashMap(),
sd.getOutputFormat(),
sd.getSerdeInfo().getSerializationLib(),
hiveOps.getClient().getSchema(dbName, tbName)
getTableColumns(dbName, tbName)
);

HivePartitionWithStatistics partitionWithStats =
Expand Down