diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index bbfa2f30e3a22e..5deaf8b51d56ca 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2417,7 +2417,7 @@ public class Config extends ConfigBase { public static int max_binlog_messsage_size = 1024 * 1024 * 1024; @ConfField(mutable = true, masterOnly = true, description = { - "是否禁止使用 WITH REOSOURCE 语句创建 Catalog。", + "是否禁止使用 WITH RESOURCE 语句创建 Catalog。", "Whether to disable creating catalog with WITH RESOURCE statement."}) public static boolean disallow_create_catalog_with_resource = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 2403ede6fb3082..d4bf72a1e9cc8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -112,6 +112,10 @@ private void addBinlog(TBinlog binlog, Object raw) { return; } + LOG.debug("add binlog, db {}, table {}, commitSeq {}, timestamp {}, type {}, data {}", + binlog.getDbId(), binlog.getTableIds(), binlog.getCommitSeq(), binlog.getTimestamp(), binlog.getType(), + binlog.getData()); + DBBinlog dbBinlog; lock.writeLock().lock(); try { @@ -589,7 +593,6 @@ public List gc() { return tombstones; } - public void replayGc(BinlogGcInfo binlogGcInfo) { lock.writeLock().lock(); Map gcDbBinlogMap; diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 675c9dc78a824e..bd7fd184426954 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -43,6 +43,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -288,7 +289,6 @@ public Pair getBinlogLag(long tableId, long prevCommitSe public Pair lockBinlog(long tableId, String jobUniqueId, long lockCommitSeq) { TableBinlog tableBinlog = null; - lock.writeLock().lock(); try { if (tableId < 0) { @@ -457,20 +457,43 @@ private TBinlog getLastExpiredBinlog(BinlogComparator checker) { } if (lastExpiredBinlog != null) { - dummy.setCommitSeq(lastExpiredBinlog.getCommitSeq()); + final long expiredCommitSeq = lastExpiredBinlog.getCommitSeq(); + dummy.setCommitSeq(expiredCommitSeq); // release expired timestamps by commit seq. Iterator> timeIter = timestamps.iterator(); - while (timeIter.hasNext() && timeIter.next().first <= lastExpiredBinlog.getCommitSeq()) { + while (timeIter.hasNext() && timeIter.next().first <= expiredCommitSeq) { timeIter.remove(); } - gcDroppedResources(lastExpiredBinlog.getCommitSeq()); + lockedBinlogs.entrySet().removeIf(ent -> ent.getValue() <= expiredCommitSeq); + gcDroppedResources(expiredCommitSeq); } return lastExpiredBinlog; } + private Optional getMinLockedCommitSeq() { + lock.readLock().lock(); + try { + Optional minLockedCommitSeq = lockedBinlogs.values().stream().min(Long::compareTo); + for (TableBinlog tableBinlog : tableBinlogMap.values()) { + Optional tableMinLockedCommitSeq = tableBinlog.getMinLockedCommitSeq(); + if (!tableMinLockedCommitSeq.isPresent()) { + continue; + } + if (minLockedCommitSeq.isPresent()) { + minLockedCommitSeq = Optional.of(Math.min(minLockedCommitSeq.get(), tableMinLockedCommitSeq.get())); + } else { + minLockedCommitSeq = tableMinLockedCommitSeq; + } + } + return minLockedCommitSeq; + } finally { + lock.readLock().unlock(); + } + } + private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) { long ttlSeconds = dbBinlogConfig.getTtlSeconds(); long maxBytes = dbBinlogConfig.getMaxBytes(); @@ -481,10 +504,12 @@ private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) { dbId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums); // step 1: get current tableBinlog info and expiredCommitSeq + Optional minLockedCommitSeq = getMinLockedCommitSeq(); TBinlog lastExpiredBinlog = null; + List tableBinlogs = Lists.newArrayList(); lock.writeLock().lock(); try { - long expiredCommitSeq = -1; + long expiredCommitSeq = -1L; Iterator> timeIter = timestamps.iterator(); while (timeIter.hasNext()) { Pair pair = timeIter.next(); @@ -494,6 +519,13 @@ private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) { expiredCommitSeq = pair.first; } + // Speed up gc by recycling binlogs that are not locked by syncer. + // To keep compatible with the old version, if no binlog is locked here, fallthrough to the + // previous behavior (keep the entire binlogs until it is expired). + if (minLockedCommitSeq.isPresent() && expiredCommitSeq + 1L < minLockedCommitSeq.get()) { + expiredCommitSeq = minLockedCommitSeq.get() - 1L; + } + final long lastExpiredCommitSeq = expiredCommitSeq; BinlogComparator checker = (binlog) -> { // NOTE: TreeSet read size during iterator remove is valid. @@ -507,6 +539,7 @@ private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) { || maxHistoryNums < allBinlogs.size(); }; lastExpiredBinlog = getLastExpiredBinlog(checker); + tableBinlogs.addAll(tableBinlogMap.values()); } finally { lock.writeLock().unlock(); } @@ -518,7 +551,7 @@ private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) { // step 2: gc every tableBinlog in dbBinlog, get table tombstone to complete db // tombstone List tableTombstones = Lists.newArrayList(); - for (TableBinlog tableBinlog : tableBinlogMap.values()) { + for (TableBinlog tableBinlog : tableBinlogs) { // step 2.1: gc tableBinlog,and get table tombstone BinlogTombstone tableTombstone = tableBinlog.commitSeqGc(lastExpiredBinlog.getCommitSeq()); if (tableTombstone != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index b6cf328ecccbdc..cef60c85ac4228 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -39,6 +39,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -171,6 +172,15 @@ private Pair lockTableBinlog(String jobUniqueId, long lockCommitS return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq); } + public Optional getMinLockedCommitSeq() { + lock.readLock().lock(); + try { + return lockedBinlogs.values().stream().min(Long::compareTo); + } finally { + lock.readLock().unlock(); + } + } + private Pair getLastUpsertAndLargestCommitSeq(BinlogComparator checker) { if (binlogs.size() <= 1) { return null; @@ -199,7 +209,7 @@ private Pair getLastUpsertAndLargestCommitSeq(BinlogComparator ch return null; } - long expiredCommitSeq = lastExpiredBinlog.getCommitSeq(); + final long expiredCommitSeq = lastExpiredBinlog.getCommitSeq(); dummyBinlog.setCommitSeq(expiredCommitSeq); Iterator> timeIterator = timestamps.iterator(); @@ -207,6 +217,7 @@ private Pair getLastUpsertAndLargestCommitSeq(BinlogComparator ch timeIterator.remove(); } + lockedBinlogs.entrySet().removeIf(ent -> ent.getValue() <= expiredCommitSeq); return Pair.of(tombstoneUpsert, expiredCommitSeq); } @@ -279,6 +290,13 @@ public BinlogTombstone gc() { } expiredCommitSeq = entry.first; } + + // find the min locked binlog commit seq, if not exists, use the last binlog commit seq. + Optional minLockedCommitSeq = lockedBinlogs.values().stream().min(Long::compareTo); + if (minLockedCommitSeq.isPresent() && expiredCommitSeq + 1L < minLockedCommitSeq.get()) { + // Speed up the gc progress by the min locked commit seq. + expiredCommitSeq = minLockedCommitSeq.get() - 1L; + } } final long lastExpiredCommitSeq = expiredCommitSeq;