Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -620,12 +620,17 @@ private void checkAndPrepareMeta() {
if (LOG.isDebugEnabled()) {
LOG.debug("get intersect part names: {}, job: {}", intersectPartNames, this);
}
if (!localOlapTbl.getSignature(BackupHandler.SIGNATURE_VERSION, intersectPartNames)
.equals(remoteOlapTbl.getSignature(
BackupHandler.SIGNATURE_VERSION, intersectPartNames))) {
String localTblSignature = localOlapTbl.getSignature(
BackupHandler.SIGNATURE_VERSION, intersectPartNames);
String remoteTblSignature = remoteOlapTbl.getSignature(
BackupHandler.SIGNATURE_VERSION, intersectPartNames);
if (!localTblSignature.equals(remoteTblSignature)) {
String alias = jobInfo.getAliasByOriginNameIfSet(tableName);
LOG.warn("Table {} already exists but with different schema, "
+ "local table: {}, remote table: {}",
alias, localTblSignature, remoteTblSignature);
status = new Status(ErrCode.COMMON_ERROR, "Table "
+ jobInfo.getAliasByOriginNameIfSet(tableName)
+ " already exist but with different schema");
+ alias + " already exist but with different schema");
return;
}

Expand Down
30 changes: 22 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -1381,12 +1381,12 @@ public long getDataLength() {
return dataSize;
}

// Get the md5 of signature string of this table with specified partitions.
// Get the signature string of this table with specified partitions.
// This method is used to determine whether the tables have the same schema.
// Contains:
// table name, table type, index name, index schema, short key column count, storage type
// bloom filter, partition type and columns, distribution type and columns.
// buckets number.
// table name, table type, index name, index schema, short key column count, storage type,
// bloom filter, partition type and columns, distribution type and columns, buckets number,
// indexes and columns.
public String getSignature(int signatureVersion, List<String> partNames) {
StringBuilder sb = new StringBuilder(signatureVersion);
sb.append(name);
Expand Down Expand Up @@ -1432,11 +1432,25 @@ public String getSignature(int signatureVersion, List<String> partNames) {
}
}

String md5 = DigestUtils.md5Hex(sb.toString());
// indexes
if (this.indexes != null) {
Map<String, Index> indexes = Maps.newTreeMap();
for (Index idx : this.indexes.getIndexes()) {
indexes.put(idx.getIndexName(), idx);
}
for (Map.Entry<String, Index> entry : indexes.entrySet()) {
Index idx = entry.getValue();
sb.append(entry.getKey());
sb.append(idx.getIndexType());
sb.append(Joiner.on(",").join(idx.getColumns()));
}
}

String signature = sb.toString();
if (LOG.isDebugEnabled()) {
LOG.debug("get signature of table {}: {}. signature string: {}", name, md5, sb.toString());
LOG.debug("get signature of table {}. signature string: {}", name, sb.toString());
}
return md5;
return signature;
}

// get intersect partition names with the given table "anotherTbl". not including temp partitions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_backup_restore_overwrite_indexes", "backup_restore") {
String suiteName = "test_backup_restore_overwrite_indexes"
String repoName = "${suiteName}_repo"
String dbName = "${suiteName}_db"
String newDbName = "${suiteName}_new_db"
String tableName = "${suiteName}_table"
String snapshotName = "${suiteName}_snapshot"


def syncer = getSyncer()
syncer.createS3Repository(repoName)

sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
sql "CREATE DATABASE IF NOT EXISTS ${newDbName}"
sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
sql """
CREATE TABLE if NOT EXISTS ${dbName}.${tableName}
(
`test` INT,
`id` INT,
`username` varchar(32) NULL DEFAULT "",
`only4test` varchar(32) NULL DEFAULT "",
INDEX idx_ngrambf (`username`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256")
)
ENGINE=OLAP
DUPLICATE KEY(`test`, `id`)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"bloom_filter_columns" = "id"
)
"""
for (int index = 0; index < 10; index++) {
sql """
INSERT INTO ${dbName}.${tableName} VALUES (${index}, ${index}, "test_${index}", "${index}_test")
"""
}

def checkNgramBf = { inputRes -> Boolean
for (List<Object> row : inputRes) {
if (row[2] == "idx_ngrambf" && row[10] == "NGRAM_BF") {
return true
}
}
return false
}
def checkBloomFilter = { inputRes -> Boolean
for (List<Object> row : inputRes) {
if ((row[1] as String).contains("\"bloom_filter_columns\" = \"id\"")) {
return true
}
}
return false
}
List<List<Object>> res = sql "SHOW INDEXES FROM ${dbName}.${tableName}"
assertTrue(checkNgramBf(res));
res = sql "SHOW CREATE TABLE ${dbName}.${tableName}"
assertTrue(checkBloomFilter(res));

sql """
BACKUP SNAPSHOT ${dbName}.${snapshotName}
TO `${repoName}`
ON (${tableName})
PROPERTIES ("type" = "full")
"""

syncer.waitSnapshotFinish(dbName)

def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)

assertTrue(snapshot != null)

sql """
RESTORE SNAPSHOT ${newDbName}.${snapshotName}
FROM `${repoName}`
ON ( `${tableName}`)
PROPERTIES
(
"backup_timestamp" = "${snapshot}",
"reserve_replica" = "true"
)
"""

syncer.waitAllRestoreFinish(newDbName)

res = sql "SHOW INDEXES FROM ${newDbName}.${tableName}"
assertTrue(checkNgramBf(res));
res = sql "SHOW CREATE TABLE ${newDbName}.${tableName}"
assertTrue(checkBloomFilter(res));

sql """
ALTER TABLE ${dbName}.${tableName}
ADD INDEX idx_only4test(`only4test`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256")
"""
def checkNgramBf1 = { inputRes -> Boolean
for (List<Object> row : inputRes) {
if (row[2] == "idx_only4test" && row[10] == "NGRAM_BF") {
return true
}
}
return false
}

int count = 0;
while (true) {
res = sql "SHOW INDEXES FROM ${dbName}.${tableName}"
if (checkNgramBf1(res)) {
break
}
count += 1;
if (count >= 30) {
throw new IllegalStateException("alter table add index timeouted")
}
Thread.sleep(1000);
}

snapshotName = "${snapshotName}_1"
sql """
BACKUP SNAPSHOT ${dbName}.${snapshotName}
TO `${repoName}`
ON (${tableName})
PROPERTIES ("type" = "full")
"""

syncer.waitSnapshotFinish(dbName)

snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)

assertTrue(snapshot != null)

// overwrite the exists table has different indexes
sql """
RESTORE SNAPSHOT ${newDbName}.${snapshotName}
FROM `${repoName}`
ON ( `${tableName}`)
PROPERTIES
(
"backup_timestamp" = "${snapshot}",
"reserve_replica" = "true"
)
"""

syncer.waitAllRestoreFinish(newDbName)

res = sql_return_maparray "SHOW RESTORE FROM ${newDbName} WHERE LABEL = '${snapshotName}'"
logger.info("result is {}", res);
assertTrue(res[0].Status.contains("already exist but with different schema"));

sql "DROP TABLE ${dbName}.${tableName} FORCE"
sql "DROP TABLE ${newDbName}.${tableName} FORCE"
sql "DROP DATABASE ${dbName} FORCE"
sql "DROP DATABASE ${newDbName} FORCE"
sql "DROP REPOSITORY `${repoName}`"

}