Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan(
Collection<ManifestEntry> mergedEntries = ManifestEntry.mergeEntries(entries);
long skippedByPartitionAndStats = startDataFiles - cntEntries.get();
for (ManifestEntry file : mergedEntries) {
// cache the data schema
schemaManager.schema(file.file().schemaId());

if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
String partInfo =
partitionType.getFieldCount() > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -80,9 +81,12 @@ public class SchemaManager implements Serializable {

@Nullable private transient Lock lock;

private final Map<Long, TableSchema> cache;

public SchemaManager(FileIO fileIO, Path tableRoot) {
this.fileIO = fileIO;
this.tableRoot = tableRoot;
this.cache = new ConcurrentHashMap<>();
}

public SchemaManager withLock(@Nullable Lock lock) {
Expand Down Expand Up @@ -363,6 +367,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
try {
boolean success = commit(newSchema);
if (success) {
cache.put(newSchema.id(), newSchema);
return newSchema;
}
} catch (Exception e) {
Expand Down Expand Up @@ -467,6 +472,10 @@ boolean commit(TableSchema newSchema) throws Exception {

/** Read schema for schema id. */
public TableSchema schema(long id) {
return cache.computeIfAbsent(id, this::loadSchemaFromFile);
}

private TableSchema loadSchemaFromFile(Long id) {
try {
return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(toSchemaPath(id)), TableSchema.class);
} catch (IOException e) {
Expand Down Expand Up @@ -507,6 +516,7 @@ public Path branchSchemaPath(String branchName, long schemaId) {
*/
public void deleteSchema(long schemaId) {
fileIO.deleteQuietly(toSchemaPath(schemaId));
this.cache.remove(schemaId);
}

public static void checkAlterTableOption(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ abstract class AbstractFileStoreTable implements FileStoreTable {
protected final TableSchema tableSchema;
protected final CatalogEnvironment catalogEnvironment;

protected final SchemaManager tableSchemaManager;

protected AbstractFileStoreTable(
FileIO fileIO,
Path path,
Expand All @@ -97,6 +99,7 @@ protected AbstractFileStoreTable(
}
this.tableSchema = tableSchema;
this.catalogEnvironment = catalogEnvironment;
tableSchemaManager = new SchemaManager(fileIO, path);
}

@Override
Expand Down Expand Up @@ -259,7 +262,7 @@ public FileStoreTable copyWithLatestSchema() {
}

protected SchemaManager schemaManager() {
return new SchemaManager(fileIO(), path);
return tableSchemaManager;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,23 @@ public void afterEach() {
}
}

@Test
public void testCache() throws Exception {
Path path = new Path(tempDir.toString());
SchemaManager manager = new SchemaManager(FileIOFinder.find(path), path);
// schema-0
manager.createTable(schema);
// schema-1
manager.commitChanges(SchemaChange.setOption("aaa", "bbb"));
// schema-2
manager.commitChanges(SchemaChange.setOption("ccc", "ddd"));

SchemaManager newSchemaManager = new SchemaManager(FileIOFinder.find(path), path);
Optional<TableSchema> schemaOpt1 = newSchemaManager.latest();
Optional<TableSchema> schemaOpt2 = newSchemaManager.latest();
assertThat(schemaOpt1.get()).isSameAs(schemaOpt2.get());
}

@Test
public void testCreateTable() throws Exception {
TableSchema tableSchema = retryArtificialException(() -> manager.createTable(schema));
Expand Down