Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ private void init() {
db.setRemoteName(remoteDbName);
}
tmpIdToDb.put(dbId, db);
initCatalogLog.addRefreshDb(dbId);
initCatalogLog.addRefreshDb(dbId, remoteDbName);
} else {
dbId = Env.getCurrentEnv().getNextId();
tmpDbNameToId.put(localDbName, dbId);
Expand Down Expand Up @@ -726,8 +726,12 @@ public void replayInitCatalog(InitCatalogLog log) {
if (LOG.isDebugEnabled()) {
LOG.debug("replay init external catalog[{}]: {}", name, log);
}
// If the remote name is missing during upgrade, all databases in the Map will be reinitialized.
if (log.getCreateCount() > 0 && (log.getRemoteDbNames() == null || log.getRemoteDbNames().isEmpty())) {
// If the remote name is missing during upgrade, or
// the refresh db's remote name is empty,
// all databases in the Map will be reinitialized.
if ((log.getCreateCount() > 0 && (log.getRemoteDbNames() == null || log.getRemoteDbNames().isEmpty()))
|| (log.getRefreshCount() > 0
&& (log.getRefreshRemoteDbNames() == null || log.getRefreshRemoteDbNames().isEmpty()))) {
dbNameToId = Maps.newConcurrentMap();
idToDb = Maps.newConcurrentMap();
lastUpdateTime = log.getLastUpdateTime();
Expand All @@ -747,6 +751,7 @@ public void replayInitCatalog(InitCatalogLog log) {
log.getRefreshDbIds().get(i), name);
continue;
}
db.get().setRemoteName(log.getRefreshRemoteDbNames().get(i));
Preconditions.checkNotNull(db.get());
tmpDbNameToId.put(db.get().getFullName(), db.get().getId());
tmpIdToDb.put(db.get().getId(), db.get());
Expand All @@ -763,6 +768,18 @@ public void replayInitCatalog(InitCatalogLog log) {
db.getFullName(), db.getId(), log.getRemoteDbNames().get(i));
}
}
// Check whether the remoteName of db in tmpIdToDb is empty
for (ExternalDatabase<? extends ExternalTable> db : tmpIdToDb.values()) {
if (Strings.isNullOrEmpty(db.getRemoteName())) {
LOG.info("Database [{}] remoteName is empty in catalog [{}], mark as uninitialized",
db.getFullName(), name);
dbNameToId = Maps.newConcurrentMap();
idToDb = Maps.newConcurrentMap();
lastUpdateTime = log.getLastUpdateTime();
initialized = false;
return;
}
}
dbNameToId = tmpDbNameToId;
idToDb = tmpIdToDb;
lastUpdateTime = log.getLastUpdateTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
LOG.debug("replay init external db[{}.{}]: {}", name, catalog.getName(), log);
}
// If the remote name is missing during upgrade, all tables in the Map will be reinitialized.
if (log.getCreateCount() > 0 && (log.getRemoteTableNames() == null || log.getRemoteTableNames().isEmpty())) {
if ((log.getCreateCount() > 0 && (log.getRemoteTableNames() == null || log.getRemoteTableNames().isEmpty()))
|| (log.getRefreshCount() > 0
&& (log.getRefreshRemoteTableNames() == null || log.getRefreshRemoteTableNames().isEmpty()))) {
tableNameToId = Maps.newConcurrentMap();
idToTbl = Maps.newConcurrentMap();
lastUpdateTime = log.getLastUpdateTime();
Expand All @@ -212,6 +214,7 @@ public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
// So we need add a validation here to avoid table(s) not found, this is just a temporary solution
// because later we will remove all the logics about InitCatalogLog/InitDatabaseLog.
if (table.isPresent()) {
table.get().setRemoteName(log.getRefreshRemoteTableNames().get(i));
tmpTableNameToId.put(table.get().getName(), table.get().getId());
tmpIdToTbl.put(table.get().getId(), table.get());

Expand All @@ -237,6 +240,19 @@ public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
LOG.info("Synchronized table (create): [Name: {}, ID: {}, Remote Name: {}]",
table.getName(), table.getId(), log.getRemoteTableNames().get(i));
}
// Check whether the remoteName and db Tbl db in idToTbl is empty
for (T table : idToTbl.values()) {
if (Strings.isNullOrEmpty(table.getRemoteName())
|| table.getDb() == null) {
LOG.info("Table [{}] remoteName or database is empty, mark as uninitialized",
table.getName());
tableNameToId = Maps.newConcurrentMap();
idToTbl = Maps.newConcurrentMap();
lastUpdateTime = log.getLastUpdateTime();
initialized = false;
return;
}
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
lastUpdateTime = log.getLastUpdateTime();
Expand Down Expand Up @@ -272,7 +288,7 @@ private void init() {
table.setDb(this);
}
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addRefreshTable(tblId);
initDatabaseLog.addRefreshTable(tblId, remoteTableName);
} else {
tblId = Env.getCurrentEnv().getNextId();
tmpTableNameToId.put(localTableName, tblId);
Expand Down Expand Up @@ -629,14 +645,22 @@ public void gsonPostProcess() throws IOException {
case "ExternalInfoSchemaTable":
ExternalInfoSchemaTable infoSchemaTable = GsonUtils.GSON.fromJson(GsonUtils.GSON.toJson(obj),
ExternalInfoSchemaTable.class);
if (infoSchemaTable.getDb() == null) {
infoSchemaTable.setDb(this);
}
tmpIdToTbl.put(infoSchemaTable.getId(), (T) infoSchemaTable);
tableNameToId.put(infoSchemaTable.getName(), infoSchemaTable.getId());
lowerCaseToTableName.put(infoSchemaTable.getName().toLowerCase(), infoSchemaTable.getName());
break;
case "ExternalMysqlTable":
ExternalMysqlTable mysqlTable = GsonUtils.GSON.fromJson(GsonUtils.GSON.toJson(obj),
ExternalMysqlTable.class);
if (mysqlTable.getDb() == null) {
mysqlTable.setDb(this);
}
tmpIdToTbl.put(mysqlTable.getId(), (T) mysqlTable);
tableNameToId.put(mysqlTable.getName(), mysqlTable.getId());
lowerCaseToTableName.put(mysqlTable.getName().toLowerCase(), mysqlTable.getName());
break;
default:
break;
Expand All @@ -649,6 +673,14 @@ public void gsonPostProcess() throws IOException {
((ExternalTable) obj).getName());
}
}
// Check whether the remoteName and db Tbl db in idToTbl is empty
for (T table : idToTbl.values()) {
if (Strings.isNullOrEmpty(table.getRemoteName())
|| table.getDb() == null) {
initialized = false;
break;
}
}
idToTbl = tmpIdToTbl;
rwLock = new MonitoredReentrantReadWriteLock(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public enum Type {
@SerializedName(value = "refreshDbIds")
private List<Long> refreshDbIds;

@SerializedName(value = "refreshRemoteDbNames")
private List<String> refreshRemoteDbNames;

@SerializedName(value = "createDbIds")
private List<Long> createDbIds;

Expand All @@ -76,15 +79,17 @@ public InitCatalogLog() {
createCount = 0;
catalogId = 0;
refreshDbIds = Lists.newArrayList();
refreshRemoteDbNames = Lists.newArrayList();
createDbIds = Lists.newArrayList();
createDbNames = Lists.newArrayList();
remoteDbNames = Lists.newArrayList();
type = Type.UNKNOWN;
}

public void addRefreshDb(long id) {
public void addRefreshDb(long id, String remoteName) {
refreshCount += 1;
refreshDbIds.add(id);
refreshRemoteDbNames.add(remoteName);
}

public void addCreateDb(long id, String name, String remoteName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public enum Type {
@SerializedName(value = "refreshTableIds")
private List<Long> refreshTableIds;

@SerializedName(value = "refreshRemoteTableNames")
private List<String> refreshRemoteTableNames;

@SerializedName(value = "createTableIds")
private List<Long> createTableIds;

Expand All @@ -81,15 +84,17 @@ public InitDatabaseLog() {
catalogId = 0;
dbId = 0;
refreshTableIds = Lists.newArrayList();
refreshRemoteTableNames = Lists.newArrayList();
createTableIds = Lists.newArrayList();
createTableNames = Lists.newArrayList();
remoteTableNames = Lists.newArrayList();
type = Type.UNKNOWN;
}

public void addRefreshTable(long id) {
public void addRefreshTable(long id, String remoteName) {
refreshCount += 1;
refreshTableIds.add(id);
refreshRemoteTableNames.add(remoteName);
}

public void addCreateTable(long id, String name, String remoteName) {
Expand Down