Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ Status SnapshotLoader::remote_http_download(
}

// Step 3: Validate remote tablet snapshot paths && remote files map
// TODO(Drogon): Add md5sum check
// key is remote snapshot paths, value is filelist
// get all these use http download action
// http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,20 @@ public List<Long> getDroppedPartitions(long dbId) {
}
}

// get the dropped tables of the db.
public List<Long> getDroppedTables(long dbId) {
lock.readLock().lock();
try {
DBBinlog dbBinlog = dbBinlogMap.get(dbId);
if (dbBinlog == null) {
return Lists.newArrayList();
}
return dbBinlog.getDroppedTables();
} finally {
lock.readLock().unlock();
}
}

public List<BinlogTombstone> gc() {
LOG.info("begin gc binlog");

Expand Down
50 changes: 40 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class DBBinlog {

// The commit seq of the dropped partitions
private List<Pair<Long, Long>> droppedPartitions;
// The commit seq of the dropped tables
private List<Pair<Long, Long>> droppedTables;

private List<TBinlog> tableDummyBinlogs;

Expand All @@ -79,6 +81,7 @@ public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) {
tableBinlogMap = Maps.newHashMap();
timestamps = Lists.newArrayList();
droppedPartitions = Lists.newArrayList();
droppedTables = Lists.newArrayList();

TBinlog dummy;
if (binlog.getType() == TBinlogType.DUMMY) {
Expand Down Expand Up @@ -121,6 +124,11 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) {
if (info != null && info.getPartitionId() > 0) {
droppedPartitions.add(Pair.of(info.getPartitionId(), binlog.getCommitSeq()));
}
} else if (binlog.getType() == TBinlogType.DROP_TABLE) {
DropTableRecord record = DropTableRecord.fromJson(binlog.data);
if (record != null && record.getTableId() > 0) {
droppedTables.add(Pair.of(record.getTableId(), binlog.getCommitSeq()));
}
}

if (tableIds == null) {
Expand Down Expand Up @@ -174,6 +182,19 @@ public void addBinlog(TBinlog binlog, Object raw) {
if (!binlog.isSetType()) {
return;
}

if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) {
long partitionId = ((DropPartitionInfo) raw).getPartitionId();
if (partitionId > 0) {
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
}
} else if (binlog.getType() == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) {
long tableId = ((DropTableRecord) raw).getTableId();
if (tableId > 0) {
droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
}
}

switch (binlog.getType()) {
case CREATE_TABLE:
return;
Expand All @@ -183,13 +204,6 @@ public void addBinlog(TBinlog binlog, Object raw) {
break;
}

if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) {
long partitionId = ((DropPartitionInfo) raw).getPartitionId();
if (partitionId > 0) {
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
}
}

for (long tableId : tableIds) {
TableBinlog tableBinlog = getTableBinlog(binlog, tableId, dbBinlogEnable);
if (tableBinlog != null) {
Expand Down Expand Up @@ -237,6 +251,18 @@ public List<Long> getDroppedPartitions() {
}
}

// Get the dropped tables of the db.
public List<Long> getDroppedTables() {
lock.readLock().lock();
try {
return droppedTables.stream()
.map(v -> v.first)
.collect(Collectors.toList());
} finally {
lock.readLock().unlock();
}
}

public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
Expand Down Expand Up @@ -354,7 +380,7 @@ private void removeExpiredMetaData(long largestExpiredCommitSeq) {
}
}

gcDroppedPartitions(largestExpiredCommitSeq);
gcDroppedPartitionAndTables(largestExpiredCommitSeq);
if (lastCommitSeq != -1) {
dummy.setCommitSeq(lastCommitSeq);
}
Expand Down Expand Up @@ -392,7 +418,7 @@ private TBinlog getLastExpiredBinlog(BinlogComparator checker) {
timeIter.remove();
}

gcDroppedPartitions(lastExpiredBinlog.getCommitSeq());
gcDroppedPartitionAndTables(lastExpiredBinlog.getCommitSeq());
}

return lastExpiredBinlog;
Expand Down Expand Up @@ -502,11 +528,15 @@ public void dbBinlogDisableReplayGc(BinlogTombstone tombstone) {
}
}

private void gcDroppedPartitions(long commitSeq) {
private void gcDroppedPartitionAndTables(long commitSeq) {
Iterator<Pair<Long, Long>> iter = droppedPartitions.iterator();
while (iter.hasNext() && iter.next().second < commitSeq) {
iter.remove();
}
iter = droppedTables.iterator();
while (iter.hasNext() && iter.next().second < commitSeq) {
iter.remove();
}
}

// not thread safety, do this without lock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public String toJson() {
return GsonUtils.GSON.toJson(this);
}

public static DropTableRecord fromJson(String json) {
return GsonUtils.GSON.fromJson(json, DropTableRecord.class);
}

@Override
public String toString() {
return toJson();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6122,6 +6122,7 @@ public static TGetMetaResult getMeta(Database db, List<Table> tables) throws Met
if (Config.enable_feature_binlog) {
BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager();
dbMeta.setDroppedPartitions(binlogManager.getDroppedPartitions(db.getId()));
dbMeta.setDroppedTables(binlogManager.getDroppedTables(db.getId()));
}

result.setDbMeta(dbMeta);
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,7 @@ struct TGetMetaDBMeta {
2: optional string name
3: optional list<TGetMetaTableMeta> tables
4: optional list<i64> dropped_partitions
5: optional list<i64> dropped_tables
}

struct TGetMetaResult {
Expand Down