diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java index 9e35cc3bd61081..edc01782f3159a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java @@ -20,5 +20,5 @@ import org.apache.doris.thrift.TBinlog; public interface BinlogComparator { - boolean isExpired(TBinlog binlog, long expired); + boolean isExpired(TBinlog binlog); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java index 30641bae8c6f27..b07f5e5d87ce41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java @@ -41,6 +41,8 @@ public BinlogConfigCache() { lock = new ReentrantReadWriteLock(); } + // Get the binlog config of the specified db, return null if no such database + // exists. public BinlogConfig getDBBinlogConfig(long dbId) { lock.readLock().lock(); BinlogConfig binlogConfig = dbTableBinlogEnableMap.get(dbId); @@ -110,7 +112,8 @@ public BinlogConfig getTableBinlogConfig(long dbId, long tableId) { OlapTable olapTable = (OlapTable) table; tableBinlogConfig = olapTable.getBinlogConfig(); // get table binlog config, when table modify binlogConfig - // it create a new binlog, not update inplace, so we don't need to clone binlogConfig + // it create a new binlog, not update inplace, so we don't need to clone + // binlogConfig dbTableBinlogEnableMap.put(tableId, tableBinlogConfig); return tableBinlogConfig; } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java index 468e5a08186422..ca43f9312ffc86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java @@ -56,7 +56,7 @@ protected void runAfterCatalogReady() { try { List tombstones = Env.getCurrentEnv().getBinlogManager().gc(); if (tombstones != null && !tombstones.isEmpty()) { - LOG.info("tomebstones size: {}", tombstones.size()); + LOG.info("tombstones size: {}", tombstones.size()); } else { LOG.info("no gc binlog"); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 96d0f7f4e13017..454f678e2e1ad1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -58,9 +58,10 @@ public class BinlogManager { private static final int BUFFER_SIZE = 16 * 1024; private static final ImmutableList TITLE_NAMES = new ImmutableList.Builder().add("Name") - .add("Type").add("Id").add("Dropped").add("BinlogLength").add("FirstBinlogCommittedTime") + .add("Type").add("Id").add("Dropped").add("BinlogLength").add("BinlogSize").add("FirstBinlogCommittedTime") .add("ReadableFirstBinlogCommittedTime").add("LastBinlogCommittedTime") - .add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds") + .add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds").add("BinlogMaxBytes") + .add("BinlogMaxHistoryNums") .build(); private static final Logger LOG = LogManager.getLogger(BinlogManager.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java index 0f6c2308248931..6b79fab143b595 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java @@ -81,6 +81,7 @@ public static TBinlog newDummyBinlog(long dbId, long tableId) { return dummy; } + // Compute the expired timestamp in milliseconds. public static long getExpiredMs(long ttlSeconds) { long currentSeconds = System.currentTimeMillis() / 1000; if (currentSeconds < ttlSeconds) { @@ -94,4 +95,11 @@ public static long getExpiredMs(long ttlSeconds) { public static String convertTimeToReadable(long time) { return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(time)); } + + public static long getApproximateMemoryUsage(TBinlog binlog) { + /* object layout: header + body + padding */ + final long objSize = 80; // 9 fields and 1 header + String data = binlog.getData(); + return objSize + binlog.getTableIdsSize() * 8 + (data == null ? 0 : data.length()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index a3133bfb5c7828..79e1adf20c9dfa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -45,6 +45,8 @@ public class DBBinlog { private static final Logger LOG = LogManager.getLogger(BinlogManager.class); private long dbId; + // The size of all binlogs. + private long binlogSize; // guard for allBinlogs && tableBinlogMap private ReentrantReadWriteLock lock; // all binlogs contain table binlogs && create table binlog etc ... @@ -64,6 +66,7 @@ public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) { lock = new ReentrantReadWriteLock(); this.dbId = binlog.getDbId(); this.binlogConfigCache = binlogConfigCache; + this.binlogSize = 0; // allBinlogs treeset order by commitSeq allBinlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq)); @@ -81,7 +84,7 @@ public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) { } public static DBBinlog recoverDbBinlog(BinlogConfigCache binlogConfigCache, TBinlog dbDummy, - List tableDummies, boolean dbBinlogEnable) { + List tableDummies, boolean dbBinlogEnable) { DBBinlog dbBinlog = new DBBinlog(binlogConfigCache, dbDummy); long dbId = dbDummy.getDbId(); for (TBinlog tableDummy : tableDummies) { @@ -105,6 +108,7 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) { } allBinlogs.add(binlog); + binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog); if (tableIds == null) { return; @@ -119,12 +123,13 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) { } } - // TODO(Drogon): remove TableBinlog after DropTable, think table drop && recovery + // TODO(Drogon): remove TableBinlog after DropTable, think table drop && + // recovery private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean dbBinlogEnable) { TableBinlog tableBinlog = tableBinlogMap.get(tableId); if (tableBinlog == null) { if (dbBinlogEnable || binlogConfigCache.isEnableTable(dbId, tableId)) { - tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId, tableId); + tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId, tableId); tableBinlogMap.put(tableId, tableBinlog); tableDummyBinlogs.add(tableBinlog.getDummyBinlog()); } @@ -132,7 +137,8 @@ private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean dbBinlo return tableBinlog; } - // guard by BinlogManager, if addBinlog called, more than one(db/tables) enable binlog + // guard by BinlogManager, if addBinlog called, more than one(db/tables) enable + // binlog public void addBinlog(TBinlog binlog) { boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId); List tableIds = binlog.getTableIds(); @@ -140,6 +146,7 @@ public void addBinlog(TBinlog binlog) { lock.writeLock().lock(); try { allBinlogs.add(binlog); + binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog); if (binlog.getTimestamp() > 0 && dbBinlogEnable) { timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp())); @@ -226,14 +233,10 @@ public BinlogTombstone gc() { return null; } - boolean dbBinlogEnable = dbBinlogConfig.isEnable(); BinlogTombstone tombstone; - if (dbBinlogEnable) { + if (dbBinlogConfig.isEnable()) { // db binlog is enabled, only one binlogTombstones - long ttlSeconds = dbBinlogConfig.getTtlSeconds(); - long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds); - - tombstone = dbBinlogEnableGc(expiredMs); + tombstone = dbBinlogEnableGc(dbBinlogConfig); } else { tombstone = dbBinlogDisableGc(); } @@ -277,7 +280,7 @@ private BinlogTombstone dbBinlogDisableGc() { } for (TableBinlog tableBinlog : tableBinlogs) { - BinlogTombstone tombstone = tableBinlog.ttlGc(); + BinlogTombstone tombstone = tableBinlog.gc(); if (tombstone != null) { tombstones.add(tombstone); } @@ -297,6 +300,7 @@ private void removeExpiredMetaData(long largestExpiredCommitSeq) { TBinlog dummy = binlogIter.next(); boolean foundFirstUsingBinlog = false; long lastCommitSeq = -1; + long removed = 0; while (binlogIter.hasNext()) { TBinlog binlog = binlogIter.next(); @@ -304,6 +308,8 @@ private void removeExpiredMetaData(long largestExpiredCommitSeq) { if (commitSeq <= largestExpiredCommitSeq) { if (binlog.table_ref <= 0) { binlogIter.remove(); + binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog); + ++removed; if (!foundFirstUsingBinlog) { lastCommitSeq = commitSeq; } @@ -318,52 +324,92 @@ private void removeExpiredMetaData(long largestExpiredCommitSeq) { if (lastCommitSeq != -1) { dummy.setCommitSeq(lastCommitSeq); } + + LOG.info("remove {} expired binlogs, dbId: {}, left: {}", removed, dbId, allBinlogs.size()); } finally { lock.writeLock().unlock(); } } - private BinlogTombstone dbBinlogEnableGc(long expiredMs) { + private TBinlog getLastExpiredBinlog(BinlogComparator checker) { + TBinlog lastExpiredBinlog = null; + + Iterator binlogIter = allBinlogs.iterator(); + TBinlog dummy = binlogIter.next(); + while (binlogIter.hasNext()) { + TBinlog binlog = binlogIter.next(); + if (checker.isExpired(binlog)) { + binlogIter.remove(); + binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog); + lastExpiredBinlog = binlog; + } else { + break; + } + } + + if (lastExpiredBinlog != null) { + dummy.setCommitSeq(lastExpiredBinlog.getCommitSeq()); + + // release expired timestamps by commit seq. + Iterator> timeIter = timestamps.iterator(); + while (timeIter.hasNext() && timeIter.next().first <= lastExpiredBinlog.getCommitSeq()) { + timeIter.remove(); + } + } + + return lastExpiredBinlog; + } + + private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) { + long ttlSeconds = dbBinlogConfig.getTtlSeconds(); + long maxBytes = dbBinlogConfig.getMaxBytes(); + long maxHistoryNums = dbBinlogConfig.getMaxHistoryNums(); + long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds); + + LOG.info("gc db binlog. dbId: {}, expiredMs: {}, ttlSecond: {}, maxBytes: {}, maxHistoryNums: {}", + dbId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums); + // step 1: get current tableBinlog info and expiredCommitSeq - long expiredCommitSeq = -1; + TBinlog lastExpiredBinlog = null; lock.writeLock().lock(); try { + long expiredCommitSeq = -1; Iterator> timeIter = timestamps.iterator(); while (timeIter.hasNext()) { Pair pair = timeIter.next(); - if (pair.second <= expiredMs) { - expiredCommitSeq = pair.first; - timeIter.remove(); - } else { + if (pair.second > expiredMs) { break; } + expiredCommitSeq = pair.first; } - Iterator binlogIter = allBinlogs.iterator(); - TBinlog dummy = binlogIter.next(); - dummy.setCommitSeq(expiredCommitSeq); - - while (binlogIter.hasNext()) { - TBinlog binlog = binlogIter.next(); - if (binlog.getCommitSeq() <= expiredCommitSeq) { - binlogIter.remove(); - } else { - break; - } - } + final long lastExpiredCommitSeq = expiredCommitSeq; + BinlogComparator checker = (binlog) -> { + // NOTE: TreeSet read size during iterator remove is valid. + // + // The expired conditions in order: + // 1. expired time + // 2. the max bytes + // 3. the max history num + return binlog.getCommitSeq() <= lastExpiredCommitSeq + || maxBytes < binlogSize + || maxHistoryNums < allBinlogs.size(); + }; + lastExpiredBinlog = getLastExpiredBinlog(checker); } finally { lock.writeLock().unlock(); } - if (expiredCommitSeq == -1) { + if (lastExpiredBinlog == null) { return null; } - // step 2: gc every tableBinlog in dbBinlog, get table tombstone to complete db tombstone + // step 2: gc every tableBinlog in dbBinlog, get table tombstone to complete db + // tombstone List tableTombstones = Lists.newArrayList(); for (TableBinlog tableBinlog : tableBinlogMap.values()) { // step 2.1: gc tableBinlog,and get table tombstone - BinlogTombstone tableTombstone = tableBinlog.commitSeqGc(expiredCommitSeq); + BinlogTombstone tableTombstone = tableBinlog.commitSeqGc(lastExpiredBinlog.getCommitSeq()); if (tableTombstone != null) { tableTombstones.add(tableTombstone); } @@ -386,28 +432,8 @@ public void dbBinlogEnableReplayGc(BinlogTombstone tombstone) { lock.writeLock().lock(); try { - Iterator> timeIter = timestamps.iterator(); - while (timeIter.hasNext()) { - long commitSeq = timeIter.next().first; - if (commitSeq <= largestExpiredCommitSeq) { - timeIter.remove(); - } else { - break; - } - } - - Iterator binlogIter = allBinlogs.iterator(); - TBinlog dummy = binlogIter.next(); - dummy.setCommitSeq(largestExpiredCommitSeq); - - while (binlogIter.hasNext()) { - TBinlog binlog = binlogIter.next(); - if (binlog.getCommitSeq() <= largestExpiredCommitSeq) { - binlogIter.remove(); - } else { - break; - } - } + BinlogComparator checker = (binlog) -> binlog.getCommitSeq() <= largestExpiredCommitSeq; + getLastExpiredBinlog(checker); } finally { lock.writeLock().unlock(); } @@ -478,6 +504,8 @@ public void getBinlogInfo(BaseProcResult result) { info.add(dropped); String binlogLength = String.valueOf(allBinlogs.size()); info.add(binlogLength); + String binlogSize = String.valueOf(this.binlogSize); + info.add(binlogSize); String firstBinlogCommittedTime = null; String readableFirstBinlogCommittedTime = null; if (!timestamps.isEmpty()) { @@ -497,10 +525,16 @@ public void getBinlogInfo(BaseProcResult result) { info.add(lastBinlogCommittedTime); info.add(readableLastBinlogCommittedTime); String binlogTtlSeconds = null; + String binlogMaxBytes = null; + String binlogMaxHistoryNums = null; if (binlogConfig != null) { binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds()); + binlogMaxBytes = String.valueOf(binlogConfig.getMaxBytes()); + binlogMaxHistoryNums = String.valueOf(binlogConfig.getMaxHistoryNums()); } info.add(binlogTtlSeconds); + info.add(binlogMaxBytes); + info.add(binlogMaxHistoryNums); result.addRow(info); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 3dd290a07f81ee..36ec4f733eafbf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -27,6 +27,7 @@ import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -43,15 +44,23 @@ public class TableBinlog { private long dbId; private long tableId; + private long binlogSize; private ReentrantReadWriteLock lock; private TreeSet binlogs; + + // Pair(commitSeq, timestamp), used for gc + // need UpsertRecord to add timestamps for gc + private List> timestamps; + private BinlogConfigCache binlogConfigCache; public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, long dbId, long tableId) { this.dbId = dbId; this.tableId = tableId; + this.binlogSize = 0; lock = new ReentrantReadWriteLock(); binlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq)); + timestamps = Lists.newArrayList(); TBinlog dummy; if (binlog.getType() == TBinlogType.DUMMY) { @@ -77,6 +86,10 @@ public void recoverBinlog(TBinlog binlog) { if (binlog.getCommitSeq() > dummy.getCommitSeq()) { binlogs.add(binlog); ++binlog.table_ref; + binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog); + if (binlog.getTimestamp() > 0) { + timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp())); + } } } @@ -85,6 +98,10 @@ public void addBinlog(TBinlog binlog) { try { binlogs.add(binlog); ++binlog.table_ref; + binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog); + if (binlog.getTimestamp() > 0) { + timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp())); + } } finally { lock.writeLock().unlock(); } @@ -108,7 +125,7 @@ public Pair getBinlogLag(long prevCommitSeq) { } } - private Pair getLastUpsertAndLargestCommitSeq(long expired, BinlogComparator checker) { + private Pair getLastUpsertAndLargestCommitSeq(BinlogComparator checker) { if (binlogs.size() <= 1) { return null; } @@ -119,9 +136,10 @@ private Pair getLastUpsertAndLargestCommitSeq(long expired, Binlo TBinlog lastExpiredBinlog = null; while (iter.hasNext()) { TBinlog binlog = iter.next(); - if (checker.isExpired(binlog, expired)) { + if (checker.isExpired(binlog)) { lastExpiredBinlog = binlog; --binlog.table_ref; + binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog); if (binlog.getType() == TBinlogType.UPSERT) { tombstoneUpsert = binlog; } @@ -135,9 +153,15 @@ private Pair getLastUpsertAndLargestCommitSeq(long expired, Binlo return null; } - dummyBinlog.setCommitSeq(lastExpiredBinlog.getCommitSeq()); + long expiredCommitSeq = lastExpiredBinlog.getCommitSeq(); + dummyBinlog.setCommitSeq(expiredCommitSeq); - return Pair.of(tombstoneUpsert, lastExpiredBinlog.getCommitSeq()); + Iterator> timeIterator = timestamps.iterator(); + while (timeIterator.hasNext() && timeIterator.next().first <= expiredCommitSeq) { + timeIterator.remove(); + } + + return Pair.of(tombstoneUpsert, expiredCommitSeq); } // this method call when db binlog enable @@ -147,8 +171,8 @@ public BinlogTombstone commitSeqGc(long expiredCommitSeq) { // step 1: get tombstoneUpsertBinlog and dummyBinlog lock.writeLock().lock(); try { - BinlogComparator check = (binlog, expire) -> binlog.getCommitSeq() <= expire; - tombstoneInfo = getLastUpsertAndLargestCommitSeq(expiredCommitSeq, check); + BinlogComparator check = (binlog) -> binlog.getCommitSeq() <= expiredCommitSeq; + tombstoneInfo = getLastUpsertAndLargestCommitSeq(check); } finally { lock.writeLock().unlock(); } @@ -171,7 +195,7 @@ public BinlogTombstone commitSeqGc(long expiredCommitSeq) { } // this method call when db binlog disable - public BinlogTombstone ttlGc() { + public BinlogTombstone gc() { // step 1: get expire time BinlogConfig tableBinlogConfig = binlogConfigCache.getTableBinlogConfig(dbId, tableId); if (tableBinlogConfig == null) { @@ -179,19 +203,43 @@ public BinlogTombstone ttlGc() { } long ttlSeconds = tableBinlogConfig.getTtlSeconds(); + long maxBytes = tableBinlogConfig.getMaxBytes(); + long maxHistoryNums = tableBinlogConfig.getMaxHistoryNums(); long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds); - if (expiredMs < 0) { - return null; - } - LOG.info("ttl gc. dbId: {}, tableId: {}, expiredMs: {}", dbId, tableId, expiredMs); + LOG.info( + "gc table binlog. dbId: {}, tableId: {}, expiredMs: {}, ttlSecond: {}, maxBytes: {}, " + + "maxHistoryNums: {}, now: {}", + dbId, tableId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums, System.currentTimeMillis()); // step 2: get tombstoneUpsertBinlog and dummyBinlog Pair tombstoneInfo; lock.writeLock().lock(); try { - BinlogComparator check = (binlog, expire) -> binlog.getTimestamp() <= expire; - tombstoneInfo = getLastUpsertAndLargestCommitSeq(expiredMs, check); + // find the last expired commit seq. + long expiredCommitSeq = -1; + Iterator> timeIterator = timestamps.iterator(); + while (timeIterator.hasNext()) { + Pair entry = timeIterator.next(); + if (expiredMs < entry.second) { + break; + } + expiredCommitSeq = entry.first; + } + + final long lastExpiredCommitSeq = expiredCommitSeq; + BinlogComparator check = (binlog) -> { + // NOTE: TreeSet read size during iterator remove is valid. + // + // The expired conditions in order: + // 1. expired time + // 2. the max bytes + // 3. the max history num + return binlog.getCommitSeq() <= lastExpiredCommitSeq + || maxBytes < binlogSize + || maxHistoryNums < binlogs.size(); + }; + tombstoneInfo = getLastUpsertAndLargestCommitSeq(check); } finally { lock.writeLock().unlock(); } @@ -216,25 +264,8 @@ public BinlogTombstone ttlGc() { public void replayGc(long largestExpiredCommitSeq) { lock.writeLock().lock(); try { - long lastSeq = -1; - Iterator iter = binlogs.iterator(); - TBinlog dummyBinlog = iter.next(); - - while (iter.hasNext()) { - TBinlog binlog = iter.next(); - long commitSeq = binlog.getCommitSeq(); - if (commitSeq <= largestExpiredCommitSeq) { - lastSeq = commitSeq; - --binlog.table_ref; - iter.remove(); - } else { - break; - } - } - - if (lastSeq != -1) { - dummyBinlog.setCommitSeq(lastSeq); - } + BinlogComparator checker = (binlog) -> binlog.getCommitSeq() <= largestExpiredCommitSeq; + getLastUpsertAndLargestCommitSeq(checker); } finally { lock.writeLock().unlock(); } @@ -278,6 +309,8 @@ public void getBinlogInfo(Database db, BaseProcResult result) { info.add(dropped); String binlogLength = String.valueOf(binlogs.size()); info.add(binlogLength); + String binlogSize = String.valueOf(this.binlogSize); + info.add(binlogSize); String firstBinlogCommittedTime = null; String readableFirstBinlogCommittedTime = null; for (TBinlog binlog : binlogs) { @@ -305,10 +338,16 @@ public void getBinlogInfo(Database db, BaseProcResult result) { info.add(lastBinlogCommittedTime); info.add(readableLastBinlogCommittedTime); String binlogTtlSeconds = null; + String binlogMaxBytes = null; + String binlogMaxHistoryNums = null; if (binlogConfig != null) { binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds()); + binlogMaxBytes = String.valueOf(binlogConfig.getMaxBytes()); + binlogMaxHistoryNums = String.valueOf(binlogConfig.getMaxHistoryNums()); } info.add(binlogTtlSeconds); + info.add(binlogMaxBytes); + info.add(binlogMaxHistoryNums); result.addRow(info); } finally { diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java index 64a539a2b86613..4a94aacf60b5e5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java @@ -276,14 +276,9 @@ public long getExpiredMs(long ttl) { for (Map.Entry> dbEntry : frameWork.entrySet()) { long dbId = dbEntry.getKey(); for (long tableId : dbEntry.getValue()) { - if ((tableId / tableBaseId) % 2 != 0) { - addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow)); - addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow)); - ++commitSeq; - } else { - addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, 0, 0)); - addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, 0, 0)); - } + addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow)); + addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow)); + ++commitSeq; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java index b4ecd8a90c5c73..cd86c5935e16af 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java @@ -75,7 +75,7 @@ public long getExpiredMs(long direct) { } // trigger ttlGc - BinlogTombstone tombstone = tableBinlog.ttlGc(); + BinlogTombstone tombstone = tableBinlog.gc(); // check binlog status for (TBinlog binlog : testBinlogs) {