diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index a5061c4decfb03..1764e3d4322e14 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -464,7 +464,6 @@ Status SnapshotLoader::remote_http_download( } // Step 3: Validate remote tablet snapshot paths && remote files map - // TODO(Drogon): Add md5sum check // key is remote snapshot paths, value is filelist // get all these use http download action // http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 77f2bf74e6629d..fc3115e2b92da3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -369,6 +369,20 @@ public List getDroppedPartitions(long dbId) { } } + // get the dropped tables of the db. + public List getDroppedTables(long dbId) { + lock.readLock().lock(); + try { + DBBinlog dbBinlog = dbBinlogMap.get(dbId); + if (dbBinlog == null) { + return Lists.newArrayList(); + } + return dbBinlog.getDroppedTables(); + } finally { + lock.readLock().unlock(); + } + } + public List gc() { LOG.info("begin gc binlog"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index b43805b06d5ceb..502491004e5dab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -62,6 +62,8 @@ public class DBBinlog { // The commit seq of the dropped partitions private List> droppedPartitions; + // The commit seq of the dropped tables + private List> droppedTables; private List tableDummyBinlogs; @@ -79,6 +81,7 @@ public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) { tableBinlogMap = Maps.newHashMap(); timestamps = Lists.newArrayList(); droppedPartitions = Lists.newArrayList(); + droppedTables = Lists.newArrayList(); TBinlog dummy; if (binlog.getType() == TBinlogType.DUMMY) { @@ -121,6 +124,11 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) { if (info != null && info.getPartitionId() > 0) { droppedPartitions.add(Pair.of(info.getPartitionId(), binlog.getCommitSeq())); } + } else if (binlog.getType() == TBinlogType.DROP_TABLE) { + DropTableRecord record = DropTableRecord.fromJson(binlog.data); + if (record != null && record.getTableId() > 0) { + droppedTables.add(Pair.of(record.getTableId(), binlog.getCommitSeq())); + } } if (tableIds == null) { @@ -174,6 +182,19 @@ public void addBinlog(TBinlog binlog, Object raw) { if (!binlog.isSetType()) { return; } + + if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) { + long partitionId = ((DropPartitionInfo) raw).getPartitionId(); + if (partitionId > 0) { + droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq())); + } + } else if (binlog.getType() == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) { + long tableId = ((DropTableRecord) raw).getTableId(); + if (tableId > 0) { + droppedTables.add(Pair.of(tableId, binlog.getCommitSeq())); + } + } + switch (binlog.getType()) { case CREATE_TABLE: return; @@ -183,13 +204,6 @@ public void addBinlog(TBinlog binlog, Object raw) { break; } - if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) { - long partitionId = ((DropPartitionInfo) raw).getPartitionId(); - if (partitionId > 0) { - droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq())); - } - } - for (long tableId : tableIds) { TableBinlog tableBinlog = getTableBinlog(binlog, tableId, dbBinlogEnable); if (tableBinlog != null) { @@ -237,6 +251,18 @@ public List getDroppedPartitions() { } } + // Get the dropped tables of the db. + public List getDroppedTables() { + lock.readLock().lock(); + try { + return droppedTables.stream() + .map(v -> v.first) + .collect(Collectors.toList()); + } finally { + lock.readLock().unlock(); + } + } + public Pair getBinlogLag(long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); lock.readLock().lock(); @@ -354,7 +380,7 @@ private void removeExpiredMetaData(long largestExpiredCommitSeq) { } } - gcDroppedPartitions(largestExpiredCommitSeq); + gcDroppedPartitionAndTables(largestExpiredCommitSeq); if (lastCommitSeq != -1) { dummy.setCommitSeq(lastCommitSeq); } @@ -392,7 +418,7 @@ private TBinlog getLastExpiredBinlog(BinlogComparator checker) { timeIter.remove(); } - gcDroppedPartitions(lastExpiredBinlog.getCommitSeq()); + gcDroppedPartitionAndTables(lastExpiredBinlog.getCommitSeq()); } return lastExpiredBinlog; @@ -502,11 +528,15 @@ public void dbBinlogDisableReplayGc(BinlogTombstone tombstone) { } } - private void gcDroppedPartitions(long commitSeq) { + private void gcDroppedPartitionAndTables(long commitSeq) { Iterator> iter = droppedPartitions.iterator(); while (iter.hasNext() && iter.next().second < commitSeq) { iter.remove(); } + iter = droppedTables.iterator(); + while (iter.hasNext() && iter.next().second < commitSeq) { + iter.remove(); + } } // not thread safety, do this without lock diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java index 155898cbc9cf8b..4417edeb97372d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java @@ -58,6 +58,10 @@ public String toJson() { return GsonUtils.GSON.toJson(this); } + public static DropTableRecord fromJson(String json) { + return GsonUtils.GSON.fromJson(json, DropTableRecord.class); + } + @Override public String toString() { return toJson(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index f908442361f018..1bb4de57f8dde0 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -6122,6 +6122,7 @@ public static TGetMetaResult getMeta(Database db, List tables) throws Met if (Config.enable_feature_binlog) { BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager(); dbMeta.setDroppedPartitions(binlogManager.getDroppedPartitions(db.getId())); + dbMeta.setDroppedTables(binlogManager.getDroppedTables(db.getId())); } result.setDbMeta(dbMeta); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index d0b45d72647b60..3ad3250bffe56f 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1414,6 +1414,7 @@ struct TGetMetaDBMeta { 2: optional string name 3: optional list tables 4: optional list dropped_partitions + 5: optional list dropped_tables } struct TGetMetaResult {