Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -590,13 +590,20 @@ private void checkAndPrepareMeta() {
status = st;
return;
}
LOG.debug("get intersect part names: {}, job: {}", intersectPartNames, this);
if (!localOlapTbl.getSignature(BackupHandler.SIGNATURE_VERSION, intersectPartNames)
.equals(remoteOlapTbl.getSignature(
BackupHandler.SIGNATURE_VERSION, intersectPartNames))) {
if (LOG.isDebugEnabled()) {
LOG.debug("get intersect part names: {}, job: {}", intersectPartNames, this);
}
String localTblSignature = localOlapTbl.getSignature(
BackupHandler.SIGNATURE_VERSION, intersectPartNames);
String remoteTblSignature = remoteOlapTbl.getSignature(
BackupHandler.SIGNATURE_VERSION, intersectPartNames);
if (!localTblSignature.equals(remoteTblSignature)) {
String alias = jobInfo.getAliasByOriginNameIfSet(tableName);
LOG.warn("Table {} already exists but with different schema, "
+ "local table: {}, remote table: {}",
alias, localTblSignature, remoteTblSignature);
status = new Status(ErrCode.COMMON_ERROR, "Table "
+ jobInfo.getAliasByOriginNameIfSet(tableName)
+ " already exist but with different schema");
+ alias + " already exist but with different schema");
return;
}

Expand Down
57 changes: 46 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -597,17 +597,27 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore
// entry.getKey() is the new partition id, use it to get the restore specified
// replica allocation
ReplicaAllocation replicaAlloc = partitionInfo.getReplicaAllocation(entry.getKey());
// save the materialized indexes before create new index, to avoid ids confliction
// between two cluster.
Map<Long, MaterializedIndex> idToIndex = Maps.newHashMap();
for (Map.Entry<Long, String> entry2 : origIdxIdToName.entrySet()) {
MaterializedIndex idx = partition.getIndex(entry2.getKey());
long newIdxId = indexNameToId.get(entry2.getValue());
int schemaHash = indexIdToMeta.get(newIdxId).getSchemaHash();
idx.setIdForRestore(newIdxId);
idToIndex.put(newIdxId, idx);
if (newIdxId != baseIndexId) {
// not base table, reset
// not base table, delete it.
partition.deleteRollupIndex(entry2.getKey());
}
}
for (Map.Entry<Long, MaterializedIndex> entry2 : idToIndex.entrySet()) {
Long idxId = entry2.getKey();
MaterializedIndex idx = entry2.getValue();
if (idxId != baseIndexId) {
// not base table, add it.
partition.createRollupIndex(idx);
}

int schemaHash = indexIdToMeta.get(idxId).getSchemaHash();
// generate new tablets in origin tablet order
int tabletNum = idx.getTablets().size();
idx.clearTabletsForRestore();
Expand Down Expand Up @@ -639,6 +649,15 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore
partition.setIdForRestore(entry.getKey());
}

// reset the indexes and update the indexes in materialized index meta too.
List<Index> indexes = this.indexes.getIndexes();
for (Index idx : indexes) {
idx.setIndexId(env.getNextId());
}
for (Map.Entry<Long, MaterializedIndexMeta> entry : indexIdToMeta.entrySet()) {
entry.getValue().setIndexes(indexes);
}

return Status.OK;
}

Expand Down Expand Up @@ -1253,12 +1272,12 @@ public CreateTableStmt toCreateTableStmt(String dbName) {
throw new RuntimeException("Don't support anymore");
}

// Get the md5 of signature string of this table with specified partitions.
// Get the signature string of this table with specified partitions.
// This method is used to determine whether the tables have the same schema.
// Contains:
// table name, table type, index name, index schema, short key column count, storage type
// bloom filter, partition type and columns, distribution type and columns.
// buckets number.
// table name, table type, index name, index schema, short key column count, storage type,
// bloom filter, partition type and columns, distribution type and columns, buckets number,
// indexes and columns.
public String getSignature(int signatureVersion, List<String> partNames) {
StringBuilder sb = new StringBuilder(signatureVersion);
sb.append(name);
Expand Down Expand Up @@ -1304,9 +1323,25 @@ public String getSignature(int signatureVersion, List<String> partNames) {
}
}

String md5 = DigestUtils.md5Hex(sb.toString());
LOG.debug("get signature of table {}: {}. signature string: {}", name, md5, sb.toString());
return md5;
// indexes
if (this.indexes != null) {
Map<String, Index> indexes = Maps.newTreeMap();
for (Index idx : this.indexes.getIndexes()) {
indexes.put(idx.getIndexName(), idx);
}
for (Map.Entry<String, Index> entry : indexes.entrySet()) {
Index idx = entry.getValue();
sb.append(entry.getKey());
sb.append(idx.getIndexType());
sb.append(Joiner.on(",").join(idx.getColumns()));
}
}

String signature = sb.toString();
if (LOG.isDebugEnabled()) {
LOG.debug("get signature of table {}. signature string: {}", name, sb.toString());
}
return signature;
}

// get intersect partition names with the given table "anotherTbl". not including temp partitions
Expand Down