From f71cd77ff41ad1cbb514825e112e994fe2321f6e Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Wed, 21 Apr 2021 17:37:35 +0800 Subject: [PATCH 1/3] [Bug-fix] lock db when get table from catalog Although the table lock can control the simultaneous modification of the table by different threads. But it cannot control the drop operation of the table by other threads. For example, when drop table and table update occur at the same time. 1. get table object by thread 1 2. drop table by thread 2 with table lock 3. update table object by thread 1 The above process is possible. At this time, step 3 actually operates a table that no longer exists, which will eventually cause the wrong metadata to be recorded. Fixed #5687 --- .../org/apache/doris/alter/AlterHandler.java | 75 ++++++++++--------- .../apache/doris/alter/SchemaChangeJobV2.java | 4 +- .../org/apache/doris/catalog/Database.java | 2 +- 3 files changed, 43 insertions(+), 38 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index 98e4e41f8ae92a..45865e87c2cddd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -410,45 +410,50 @@ public void handleFinishAlterTask(AlterReplicaTask task) throws MetaNotFoundExce throw new MetaNotFoundException("database " + task.getDbId() + " does not exist"); } - OlapTable tbl = (OlapTable) db.getTableOrThrowException(task.getTableId(), Table.TableType.OLAP); - tbl.writeLock(); + db.readLock(); try { - Partition partition = tbl.getPartition(task.getPartitionId()); - if (partition == null) { - throw new MetaNotFoundException("partition " + task.getPartitionId() + " does not exist"); - } - MaterializedIndex index = partition.getIndex(task.getIndexId()); - if (index == null) { - throw new MetaNotFoundException("index " + task.getIndexId() + " does not exist"); - } - Tablet tablet = index.getTablet(task.getTabletId()); - Preconditions.checkNotNull(tablet, task.getTabletId()); - Replica replica = tablet.getReplicaById(task.getNewReplicaId()); - if (replica == null) { - throw new MetaNotFoundException("replica " + task.getNewReplicaId() + " does not exist"); - } - - LOG.info("before handle alter task tablet {}, replica: {}, task version: {}-{}", - task.getSignature(), replica, task.getVersion(), task.getVersionHash()); - boolean versionChanged = false; - if (replica.getVersion() < task.getVersion()) { - replica.updateVersionInfo(task.getVersion(), task.getVersionHash(), replica.getDataSize(), replica.getRowCount()); - versionChanged = true; - } + OlapTable tbl = (OlapTable) db.getTableOrThrowException(task.getTableId(), Table.TableType.OLAP); + tbl.writeLock(); + try { + Partition partition = tbl.getPartition(task.getPartitionId()); + if (partition == null) { + throw new MetaNotFoundException("partition " + task.getPartitionId() + " does not exist"); + } + MaterializedIndex index = partition.getIndex(task.getIndexId()); + if (index == null) { + throw new MetaNotFoundException("index " + task.getIndexId() + " does not exist"); + } + Tablet tablet = index.getTablet(task.getTabletId()); + Preconditions.checkNotNull(tablet, task.getTabletId()); + Replica replica = tablet.getReplicaById(task.getNewReplicaId()); + if (replica == null) { + throw new MetaNotFoundException("replica " + task.getNewReplicaId() + " does not exist"); + } + + LOG.info("before handle alter task tablet {}, replica: {}, task version: {}-{}", + task.getSignature(), replica, task.getVersion(), task.getVersionHash()); + boolean versionChanged = false; + if (replica.getVersion() < task.getVersion()) { + replica.updateVersionInfo(task.getVersion(), task.getVersionHash(), replica.getDataSize(), replica.getRowCount()); + versionChanged = true; + } - if (versionChanged) { - ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(task.getDbId(), task.getTableId(), - task.getPartitionId(), task.getIndexId(), task.getTabletId(), task.getBackendId(), - replica.getId(), replica.getVersion(), replica.getVersionHash(), -1, - replica.getDataSize(), replica.getRowCount(), - replica.getLastFailedVersion(), replica.getLastFailedVersionHash(), - replica.getLastSuccessVersion(), replica.getLastSuccessVersionHash()); - Catalog.getCurrentCatalog().getEditLog().logUpdateReplica(info); + if (versionChanged) { + ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(task.getDbId(), task.getTableId(), + task.getPartitionId(), task.getIndexId(), task.getTabletId(), task.getBackendId(), + replica.getId(), replica.getVersion(), replica.getVersionHash(), -1, + replica.getDataSize(), replica.getRowCount(), + replica.getLastFailedVersion(), replica.getLastFailedVersionHash(), + replica.getLastSuccessVersion(), replica.getLastSuccessVersionHash()); + Catalog.getCurrentCatalog().getEditLog().logUpdateReplica(info); + } + + LOG.info("after handle alter task tablet: {}, replica: {}", task.getSignature(), replica); + } finally { + tbl.writeUnlock(); } - - LOG.info("after handle alter task tablet: {}, replica: {}", task.getSignature(), replica); } finally { - tbl.writeUnlock(); + db.readUnlock(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 665f37663369f2..bd22ba2a2060e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -706,7 +706,7 @@ private void replayCreateJob(SchemaChangeJobV2 replayedJob) { this.watershedTxnId = replayedJob.watershedTxnId; jobState = JobState.WAITING_TXN; - LOG.info("replay pending schema change job: {}", jobId); + LOG.info("replay pending schema change job: {}, table id: {}", jobId, tableId); } /** @@ -735,7 +735,7 @@ private void replayPendingJob(SchemaChangeJobV2 replayedJob) { // should still be in WAITING_TXN state, so that the alter tasks will be resend again this.jobState = JobState.WAITING_TXN; this.watershedTxnId = replayedJob.watershedTxnId; - LOG.info("replay waiting txn schema change job: {}", jobId); + LOG.info("replay waiting txn schema change job: {} table id: {}", jobId, tableId); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index c1d6f975d36c59..f1b7e829723436 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -433,7 +433,7 @@ public Table getTable(long tableId) { */ public Table getTableOrThrowException(long tableId, TableType tableType) throws MetaNotFoundException { Table table = idToTable.get(tableId); - if(table == null) { + if (table == null) { throw new MetaNotFoundException("unknown table, tableId=" + tableId); } if (table.getType() != tableType) { From c9a37516997de7c3457cf35064ca24bef1c610b2 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Thu, 22 Apr 2021 10:41:30 +0800 Subject: [PATCH 2/3] change to ignore dropped table when replay modified table log --- .../org/apache/doris/alter/AlterHandler.java | 73 +++++++++---------- .../org/apache/doris/catalog/Catalog.java | 24 ++++-- .../doris/persist/ReplicaPersistInfo.java | 1 - 3 files changed, 53 insertions(+), 45 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index 45865e87c2cddd..4cf206a525ac87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -410,50 +410,45 @@ public void handleFinishAlterTask(AlterReplicaTask task) throws MetaNotFoundExce throw new MetaNotFoundException("database " + task.getDbId() + " does not exist"); } - db.readLock(); + OlapTable tbl = (OlapTable) db.getTableOrThrowException(task.getTableId(), Table.TableType.OLAP); + tbl.writeLock(); try { - OlapTable tbl = (OlapTable) db.getTableOrThrowException(task.getTableId(), Table.TableType.OLAP); - tbl.writeLock(); - try { - Partition partition = tbl.getPartition(task.getPartitionId()); - if (partition == null) { - throw new MetaNotFoundException("partition " + task.getPartitionId() + " does not exist"); - } - MaterializedIndex index = partition.getIndex(task.getIndexId()); - if (index == null) { - throw new MetaNotFoundException("index " + task.getIndexId() + " does not exist"); - } - Tablet tablet = index.getTablet(task.getTabletId()); - Preconditions.checkNotNull(tablet, task.getTabletId()); - Replica replica = tablet.getReplicaById(task.getNewReplicaId()); - if (replica == null) { - throw new MetaNotFoundException("replica " + task.getNewReplicaId() + " does not exist"); - } - - LOG.info("before handle alter task tablet {}, replica: {}, task version: {}-{}", - task.getSignature(), replica, task.getVersion(), task.getVersionHash()); - boolean versionChanged = false; - if (replica.getVersion() < task.getVersion()) { - replica.updateVersionInfo(task.getVersion(), task.getVersionHash(), replica.getDataSize(), replica.getRowCount()); - versionChanged = true; - } + Partition partition = tbl.getPartition(task.getPartitionId()); + if (partition == null) { + throw new MetaNotFoundException("partition " + task.getPartitionId() + " does not exist"); + } + MaterializedIndex index = partition.getIndex(task.getIndexId()); + if (index == null) { + throw new MetaNotFoundException("index " + task.getIndexId() + " does not exist"); + } + Tablet tablet = index.getTablet(task.getTabletId()); + Preconditions.checkNotNull(tablet, task.getTabletId()); + Replica replica = tablet.getReplicaById(task.getNewReplicaId()); + if (replica == null) { + throw new MetaNotFoundException("replica " + task.getNewReplicaId() + " does not exist"); + } - if (versionChanged) { - ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(task.getDbId(), task.getTableId(), - task.getPartitionId(), task.getIndexId(), task.getTabletId(), task.getBackendId(), - replica.getId(), replica.getVersion(), replica.getVersionHash(), -1, - replica.getDataSize(), replica.getRowCount(), - replica.getLastFailedVersion(), replica.getLastFailedVersionHash(), - replica.getLastSuccessVersion(), replica.getLastSuccessVersionHash()); - Catalog.getCurrentCatalog().getEditLog().logUpdateReplica(info); - } + LOG.info("before handle alter task tablet {}, replica: {}, task version: {}-{}", + task.getSignature(), replica, task.getVersion(), task.getVersionHash()); + boolean versionChanged = false; + if (replica.getVersion() < task.getVersion()) { + replica.updateVersionInfo(task.getVersion(), task.getVersionHash(), replica.getDataSize(), replica.getRowCount()); + versionChanged = true; + } - LOG.info("after handle alter task tablet: {}, replica: {}", task.getSignature(), replica); - } finally { - tbl.writeUnlock(); + if (versionChanged) { + ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(task.getDbId(), task.getTableId(), + task.getPartitionId(), task.getIndexId(), task.getTabletId(), task.getBackendId(), + replica.getId(), replica.getVersion(), replica.getVersionHash(), -1, + replica.getDataSize(), replica.getRowCount(), + replica.getLastFailedVersion(), replica.getLastFailedVersionHash(), + replica.getLastSuccessVersion(), replica.getLastSuccessVersionHash()); + Catalog.getCurrentCatalog().getEditLog().logUpdateReplica(info); } + + LOG.info("after handle alter task tablet: {}, replica: {}", task.getSignature(), replica); } finally { - db.readUnlock(); + tbl.writeUnlock(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 3ae81163a9c19c..d50a8c605b832b 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -147,6 +147,7 @@ import org.apache.doris.load.LoadChecker; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.LoadJob; +import org.apache.doris.load.StreamLoadRecordMgr; import org.apache.doris.load.loadv2.LoadEtlChecker; import org.apache.doris.load.loadv2.LoadJobScheduler; import org.apache.doris.load.loadv2.LoadLoadingChecker; @@ -155,7 +156,6 @@ import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.load.routineload.RoutineLoadScheduler; import org.apache.doris.load.routineload.RoutineLoadTaskScheduler; -import org.apache.doris.load.StreamLoadRecordMgr; import org.apache.doris.master.Checkpoint; import org.apache.doris.master.MetaHelper; import org.apache.doris.meta.MetaContext; @@ -225,14 +225,10 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Queues; import com.google.common.collect.Sets; -import com.sleepycat.je.rep.InsufficientLogException; -import com.sleepycat.je.rep.NetworkRestore; -import com.sleepycat.je.rep.NetworkRestoreConfig; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.codehaus.jackson.map.ObjectMapper; import java.io.BufferedInputStream; import java.io.BufferedReader; @@ -262,6 +258,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import com.sleepycat.je.rep.InsufficientLogException; +import com.sleepycat.je.rep.NetworkRestore; +import com.sleepycat.je.rep.NetworkRestoreConfig; +import org.codehaus.jackson.map.ObjectMapper; + public class Catalog { private static final Logger LOG = LogManager.getLogger(Catalog.class); // 0 ~ 9999 used for qe @@ -4534,6 +4535,19 @@ public void replayAddReplica(ReplicaPersistInfo info) { public void replayUpdateReplica(ReplicaPersistInfo info) { Database db = getDb(info.getDbId()); OlapTable olapTable = (OlapTable) db.getTable(info.getTableId()); + if (olapTable == null) { + /** + * In the following cases, doris may record metadata modification information for a table that no longer exists. + * 1. Thread 1: get TableA object + * 2. Thread 2: lock db and drop table and record edit log of the dropped TableA + * 3. Thread 1: lock table, modify table and record edit log of the modified TableA + * **The modified edit log is after the dropped edit log** + * Because the table has been dropped, the olapTable in here is null when the modified edit log is replayed. + * So in this case, we will ignore the edit log of the modified table after the table is dropped. + */ + LOG.warn("Olap table is null when the update replica log is replayed, {}", info); + return; + } olapTable.writeLock(); try { unprotectUpdateReplica(info); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplicaPersistInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplicaPersistInfo.java index ed9935afb99a43..fdc8a3bab4e049 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplicaPersistInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplicaPersistInfo.java @@ -392,7 +392,6 @@ public String toString() { sb.append("table id: ").append(tableId); sb.append(" partition id: ").append(partitionId); sb.append(" index id: ").append(indexId); - sb.append(" index id: ").append(indexId); sb.append(" tablet id: ").append(tabletId); sb.append(" backend id: ").append(backendId); sb.append(" replica id: ").append(replicaId); From 270ccfbe0ea1c60638b41efae6de74dd25d8cb78 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Mon, 26 Apr 2021 20:20:29 +0800 Subject: [PATCH 3/3] Add update and delete replica --- .../java/org/apache/doris/catalog/Catalog.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index d50a8c605b832b..265d2212dbeff6 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4524,6 +4524,13 @@ private void unprotectUpdateReplica(ReplicaPersistInfo info) { public void replayAddReplica(ReplicaPersistInfo info) { Database db = getDb(info.getDbId()); OlapTable olapTable = (OlapTable) db.getTable(info.getTableId()); + if (olapTable == null) { + /** + * Same as replayUpdateReplica() + */ + LOG.warn("Olap table is null when the add replica log is replayed, {}", info); + return; + } olapTable.writeLock(); try { unprotectAddReplica(info); @@ -4568,6 +4575,13 @@ public void unprotectDeleteReplica(ReplicaPersistInfo info) { public void replayDeleteReplica(ReplicaPersistInfo info) { Database db = getDb(info.getDbId()); OlapTable tbl = (OlapTable) db.getTable(info.getTableId()); + if (tbl == null) { + /** + * Same as replayUpdateReplica() + */ + LOG.warn("Olap table is null when the delete replica log is replayed, {}", info); + return; + } tbl.writeLock(); try { unprotectDeleteReplica(info);