Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -3769,22 +3769,35 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws DdlExcept
throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name());
}

if (!db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists())) {
Pair<Boolean, Boolean> result = db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists());
if (!result.first) {
ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exists");
}

// we have added these index to memory, only need to persist here
if (getColocateTableIndex().isColocateTable(tableId)) {
GroupId groupId = getColocateTableIndex().getGroup(tableId);
List<List<Long>> backendsPerBucketSeq = getColocateTableIndex().getBackendsPerBucketSeq(groupId);
ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId, backendsPerBucketSeq);
editLog.logColocateAddTable(info);
}
LOG.info("successfully create table[{};{}]", tableName, tableId);
// register or remove table from DynamicPartition after table created
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
dynamicPartitionScheduler.createOrUpdateRuntimeInfo(
tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
if (result.second) {
if (getColocateTableIndex().isColocateTable(tableId)) {
// if this is a colocate join table, its table id is already added to colocate group
// so we should remove the tableId here
getColocateTableIndex().removeTable(tableId);
}
for (Long tabletId : tabletIdSet) {
Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
}
LOG.info("duplicate create table[{};{}], skip next steps", tableName, tableId);
} else {
// we have added these index to memory, only need to persist here
if (getColocateTableIndex().isColocateTable(tableId)) {
GroupId groupId = getColocateTableIndex().getGroup(tableId);
List<List<Long>> backendsPerBucketSeq = getColocateTableIndex().getBackendsPerBucketSeq(groupId);
ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId, backendsPerBucketSeq);
editLog.logColocateAddTable(info);
}
LOG.info("successfully create table[{};{}]", tableName, tableId);
// register or remove table from DynamicPartition after table created
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
dynamicPartitionScheduler.createOrUpdateRuntimeInfo(
tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
}
} catch (DdlException e) {
for (Long tabletId : tabletIdSet) {
Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
Expand All @@ -3807,7 +3820,7 @@ private void createMysqlTable(Database db, CreateTableStmt stmt) throws DdlExcep
long tableId = Catalog.getCurrentCatalog().getNextId();
MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, stmt.getProperties());
mysqlTable.setComment(stmt.getComment());
if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists())) {
if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
Expand All @@ -3822,7 +3835,7 @@ private void createOdbcTable(Database db, CreateTableStmt stmt) throws DdlExcept
long tableId = Catalog.getCurrentCatalog().getNextId();
OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, stmt.getProperties());
odbcTable.setComment(stmt.getComment());
if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists())) {
if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
Expand Down Expand Up @@ -3853,7 +3866,7 @@ private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlExcepti
EsTable esTable = new EsTable(tableId, tableName, baseSchema, stmt.getProperties(), partitionInfo);
esTable.setComment(stmt.getComment());

if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists())) {
if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
}
LOG.info("successfully create table{} with id {}", tableName, tableId);
Expand All @@ -3870,7 +3883,7 @@ private void createBrokerTable(Database db, CreateTableStmt stmt) throws DdlExce
brokerTable.setComment(stmt.getComment());
brokerTable.setBrokerProperties(stmt.getExtProperties());

if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists())) {
if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
Expand All @@ -3884,7 +3897,7 @@ private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlExcept
long tableId = getNextId();
HiveTable hiveTable = new HiveTable(tableId, tableName, columns, stmt.getProperties());
hiveTable.setComment(stmt.getComment());
if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists())) {
if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
Expand Down Expand Up @@ -5531,7 +5544,7 @@ public void createView(CreateViewStmt stmt) throws DdlException {
throw new DdlException("failed to init view stmt", e);
}

if (!db.createTableWithLock(newView, false, stmt.isSetIfNotExists())) {
if (!db.createTableWithLock(newView, false, stmt.isSetIfNotExists()).first) {
throw new DdlException("Failed to create view[" + tableName + "].");
}
LOG.info("successfully create view[" + tableName + "-" + newView.getId() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,4 +700,10 @@ public void setBackendsSetByIdxForGroup(GroupId groupId, int tabletOrderIdx, Set
writeUnlock();
}
}

// just for ut
public Map<Long, GroupId> getTable2Group() {
return table2Group;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,17 @@ public void checkQuota() throws DdlException {
checkReplicaQuota();
}

public boolean createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
boolean result = true;
// if a table is already exists, then edit log won't be executed
// some caller of this method may need to know this message
boolean isTableExist = false;
writeLock();
try {
String tableName = table.getName();
if (nameToTable.containsKey(tableName)) {
result = setIfNotExist;
isTableExist = true;
} else {
idToTable.put(table.getId(), table);
nameToTable.put(table.getName(), table);
Expand All @@ -301,7 +305,7 @@ public boolean createTableWithLock(Table table, boolean isReplay, boolean setIfN
Catalog.getCurrentCatalog().getEsRepository().registerTable((EsTable)table);
}
}
return result;
return Pair.create(result, isTableExist);
} finally {
writeUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.catalog;

import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Pair;
import org.apache.doris.common.SystemIdGenerator;

import java.io.DataInput;
Expand All @@ -39,8 +40,8 @@ public InfoSchemaDb(String cluster) {
}

@Override
public boolean createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
return false;
public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
return Pair.create(false, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,5 +695,25 @@ public PartitionBalanceInfo(PartitionBalanceInfo info) {
this.beByReplicaCount = TreeMultimap.create(info.beByReplicaCount);
}
}

// just for ut
public Table<Long, Long, Replica> getReplicaMetaTable() {
return replicaMetaTable;
}

// just for ut
public Table<Long, Long, Replica> getBackingReplicaMetaTable() {
return backingReplicaMetaTable;
}

// just for ut
public Table<Long, Long, TabletMeta> getTabletMetaTable() {
return tabletMetaTable;
}

// just for ut
public Map<Long, TabletMeta> getTabletMetaMap() {
return tabletMetaMap;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.junit.Test;

import java.io.File;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;

public class CreateTableTest {
Expand Down Expand Up @@ -62,6 +64,38 @@ private static void createTable(String sql) throws Exception {
Catalog.getCurrentCatalog().createTable(createTableStmt);
}

@Test
public void testDuplicateCreateTable() throws Exception{
// test
Catalog catalog = Catalog.getCurrentCatalog();
String sql = "create table if not exists test.tbl1\n" + "(k1 int, k2 int)\n" + "duplicate key(k1)\n"
+ "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1','colocate_with'='test'); ";
createTable(sql);
Set<Long> tabletIdSetAfterCreateFirstTable = catalog.getTabletInvertedIndex().getReplicaMetaTable().rowKeySet();
Set<TabletMeta> tabletMetaSetBeforeCreateFirstTable = new HashSet<>();
catalog.getTabletInvertedIndex().getTabletMetaTable().values().forEach(tabletMeta -> {tabletMetaSetBeforeCreateFirstTable.add(tabletMeta);});
Set<Long> colocateTableIdBeforeCreateFirstTable = catalog.getColocateTableIndex().getTable2Group().keySet();
Assert.assertTrue(colocateTableIdBeforeCreateFirstTable.size() > 0);
Assert.assertTrue(tabletIdSetAfterCreateFirstTable.size() > 0);

createTable(sql);
// check whether tablet is cleared after duplicate create table
Set<Long> tabletIdSetAfterDuplicateCreateTable1 = catalog.getTabletInvertedIndex().getReplicaMetaTable().rowKeySet();
Set<Long> tabletIdSetAfterDuplicateCreateTable2 = catalog.getTabletInvertedIndex().getBackingReplicaMetaTable().columnKeySet();
Set<Long> tabletIdSetAfterDuplicateCreateTable3 = catalog.getTabletInvertedIndex().getTabletMetaMap().keySet();
Set<TabletMeta> tabletIdSetAfterDuplicateCreateTable4 = new HashSet<>();
catalog.getTabletInvertedIndex().getTabletMetaTable().values().forEach(tabletMeta -> {tabletIdSetAfterDuplicateCreateTable4.add(tabletMeta);});

Assert.assertTrue(tabletIdSetAfterCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable1));
Assert.assertTrue(tabletIdSetAfterCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable2));
Assert.assertTrue(tabletIdSetAfterCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable3));
Assert.assertTrue(tabletMetaSetBeforeCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable4));

// check whether table id is cleared from colocate group after duplicate create table
Set<Long> colocateTableIdAfterCreateFirstTable = catalog.getColocateTableIndex().getTable2Group().keySet();
Assert.assertTrue(colocateTableIdBeforeCreateFirstTable.equals(colocateTableIdAfterCreateFirstTable));
}

@Test
public void testNormal() throws DdlException {
ExceptionChecker.expectThrowsNoException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void testNormal() throws IOException {
Database db = new InfoSchemaDb();

Assert.assertFalse(db.createTable(null));
Assert.assertFalse(db.createTableWithLock(null, false, false));
Assert.assertFalse(db.createTableWithLock(null, false, false).first);
db.dropTable("authors");
db.write(null);
Assert.assertNull(db.getTable("authors"));
Expand Down