Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
import org.apache.doris.thrift.TBinlog;

public interface BinlogComparator {
boolean isExpired(TBinlog binlog, long expired);
boolean isExpired(TBinlog binlog);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public BinlogConfigCache() {
lock = new ReentrantReadWriteLock();
}

// Get the binlog config of the specified db, return null if no such database
// exists.
public BinlogConfig getDBBinlogConfig(long dbId) {
lock.readLock().lock();
BinlogConfig binlogConfig = dbTableBinlogEnableMap.get(dbId);
Expand Down Expand Up @@ -110,7 +112,8 @@ public BinlogConfig getTableBinlogConfig(long dbId, long tableId) {
OlapTable olapTable = (OlapTable) table;
tableBinlogConfig = olapTable.getBinlogConfig();
// get table binlog config, when table modify binlogConfig
// it create a new binlog, not update inplace, so we don't need to clone binlogConfig
// it create a new binlog, not update inplace, so we don't need to clone
// binlogConfig
dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
return tableBinlogConfig;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected void runAfterCatalogReady() {
try {
List<BinlogTombstone> tombstones = Env.getCurrentEnv().getBinlogManager().gc();
if (tombstones != null && !tombstones.isEmpty()) {
LOG.info("tomebstones size: {}", tombstones.size());
LOG.info("tombstones size: {}", tombstones.size());
} else {
LOG.info("no gc binlog");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@
public class BinlogManager {
private static final int BUFFER_SIZE = 16 * 1024;
private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("Name")
.add("Type").add("Id").add("Dropped").add("BinlogLength").add("FirstBinlogCommittedTime")
.add("Type").add("Id").add("Dropped").add("BinlogLength").add("BinlogSize").add("FirstBinlogCommittedTime")
.add("ReadableFirstBinlogCommittedTime").add("LastBinlogCommittedTime")
.add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds")
.add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds").add("BinlogMaxBytes")
.add("BinlogMaxHistoryNums")
.build();

private static final Logger LOG = LogManager.getLogger(BinlogManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public static TBinlog newDummyBinlog(long dbId, long tableId) {
return dummy;
}

// Compute the expired timestamp in milliseconds.
public static long getExpiredMs(long ttlSeconds) {
long currentSeconds = System.currentTimeMillis() / 1000;
if (currentSeconds < ttlSeconds) {
Expand All @@ -94,4 +95,11 @@ public static long getExpiredMs(long ttlSeconds) {
public static String convertTimeToReadable(long time) {
return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(time));
}

public static long getApproximateMemoryUsage(TBinlog binlog) {
/* object layout: header + body + padding */
final long objSize = 80; // 9 fields and 1 header
String data = binlog.getData();
return objSize + binlog.getTableIdsSize() * 8 + (data == null ? 0 : data.length());
}
}
142 changes: 88 additions & 54 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class DBBinlog {
private static final Logger LOG = LogManager.getLogger(BinlogManager.class);

private long dbId;
// The size of all binlogs.
private long binlogSize;
// guard for allBinlogs && tableBinlogMap
private ReentrantReadWriteLock lock;
// all binlogs contain table binlogs && create table binlog etc ...
Expand All @@ -64,6 +66,7 @@ public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) {
lock = new ReentrantReadWriteLock();
this.dbId = binlog.getDbId();
this.binlogConfigCache = binlogConfigCache;
this.binlogSize = 0;

// allBinlogs treeset order by commitSeq
allBinlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
Expand All @@ -81,7 +84,7 @@ public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) {
}

public static DBBinlog recoverDbBinlog(BinlogConfigCache binlogConfigCache, TBinlog dbDummy,
List<TBinlog> tableDummies, boolean dbBinlogEnable) {
List<TBinlog> tableDummies, boolean dbBinlogEnable) {
DBBinlog dbBinlog = new DBBinlog(binlogConfigCache, dbDummy);
long dbId = dbDummy.getDbId();
for (TBinlog tableDummy : tableDummies) {
Expand All @@ -105,6 +108,7 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) {
}

allBinlogs.add(binlog);
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);

if (tableIds == null) {
return;
Expand All @@ -119,27 +123,30 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) {
}
}

// TODO(Drogon): remove TableBinlog after DropTable, think table drop && recovery
// TODO(Drogon): remove TableBinlog after DropTable, think table drop &&
// recovery
private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean dbBinlogEnable) {
TableBinlog tableBinlog = tableBinlogMap.get(tableId);
if (tableBinlog == null) {
if (dbBinlogEnable || binlogConfigCache.isEnableTable(dbId, tableId)) {
tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId, tableId);
tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId, tableId);
tableBinlogMap.put(tableId, tableBinlog);
tableDummyBinlogs.add(tableBinlog.getDummyBinlog());
}
}
return tableBinlog;
}

// guard by BinlogManager, if addBinlog called, more than one(db/tables) enable binlog
// guard by BinlogManager, if addBinlog called, more than one(db/tables) enable
// binlog
public void addBinlog(TBinlog binlog) {
boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
List<Long> tableIds = binlog.getTableIds();

lock.writeLock().lock();
try {
allBinlogs.add(binlog);
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);

if (binlog.getTimestamp() > 0 && dbBinlogEnable) {
timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
Expand Down Expand Up @@ -226,14 +233,10 @@ public BinlogTombstone gc() {
return null;
}

boolean dbBinlogEnable = dbBinlogConfig.isEnable();
BinlogTombstone tombstone;
if (dbBinlogEnable) {
if (dbBinlogConfig.isEnable()) {
// db binlog is enabled, only one binlogTombstones
long ttlSeconds = dbBinlogConfig.getTtlSeconds();
long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);

tombstone = dbBinlogEnableGc(expiredMs);
tombstone = dbBinlogEnableGc(dbBinlogConfig);
} else {
tombstone = dbBinlogDisableGc();
}
Expand Down Expand Up @@ -277,7 +280,7 @@ private BinlogTombstone dbBinlogDisableGc() {
}

for (TableBinlog tableBinlog : tableBinlogs) {
BinlogTombstone tombstone = tableBinlog.ttlGc();
BinlogTombstone tombstone = tableBinlog.gc();
if (tombstone != null) {
tombstones.add(tombstone);
}
Expand All @@ -297,13 +300,16 @@ private void removeExpiredMetaData(long largestExpiredCommitSeq) {
TBinlog dummy = binlogIter.next();
boolean foundFirstUsingBinlog = false;
long lastCommitSeq = -1;
long removed = 0;

while (binlogIter.hasNext()) {
TBinlog binlog = binlogIter.next();
long commitSeq = binlog.getCommitSeq();
if (commitSeq <= largestExpiredCommitSeq) {
if (binlog.table_ref <= 0) {
binlogIter.remove();
binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog);
++removed;
if (!foundFirstUsingBinlog) {
lastCommitSeq = commitSeq;
}
Expand All @@ -318,52 +324,92 @@ private void removeExpiredMetaData(long largestExpiredCommitSeq) {
if (lastCommitSeq != -1) {
dummy.setCommitSeq(lastCommitSeq);
}

LOG.info("remove {} expired binlogs, dbId: {}, left: {}", removed, dbId, allBinlogs.size());
} finally {
lock.writeLock().unlock();
}
}

private BinlogTombstone dbBinlogEnableGc(long expiredMs) {
private TBinlog getLastExpiredBinlog(BinlogComparator checker) {
TBinlog lastExpiredBinlog = null;

Iterator<TBinlog> binlogIter = allBinlogs.iterator();
TBinlog dummy = binlogIter.next();
while (binlogIter.hasNext()) {
TBinlog binlog = binlogIter.next();
if (checker.isExpired(binlog)) {
binlogIter.remove();
binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog);
lastExpiredBinlog = binlog;
} else {
break;
}
}

if (lastExpiredBinlog != null) {
dummy.setCommitSeq(lastExpiredBinlog.getCommitSeq());

// release expired timestamps by commit seq.
Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
while (timeIter.hasNext() && timeIter.next().first <= lastExpiredBinlog.getCommitSeq()) {
timeIter.remove();
}
}

return lastExpiredBinlog;
}

private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) {
long ttlSeconds = dbBinlogConfig.getTtlSeconds();
long maxBytes = dbBinlogConfig.getMaxBytes();
long maxHistoryNums = dbBinlogConfig.getMaxHistoryNums();
long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);

LOG.info("gc db binlog. dbId: {}, expiredMs: {}, ttlSecond: {}, maxBytes: {}, maxHistoryNums: {}",
dbId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums);

// step 1: get current tableBinlog info and expiredCommitSeq
long expiredCommitSeq = -1;
TBinlog lastExpiredBinlog = null;
lock.writeLock().lock();
try {
long expiredCommitSeq = -1;
Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
while (timeIter.hasNext()) {
Pair<Long, Long> pair = timeIter.next();
if (pair.second <= expiredMs) {
expiredCommitSeq = pair.first;
timeIter.remove();
} else {
if (pair.second > expiredMs) {
break;
}
expiredCommitSeq = pair.first;
}

Iterator<TBinlog> binlogIter = allBinlogs.iterator();
TBinlog dummy = binlogIter.next();
dummy.setCommitSeq(expiredCommitSeq);

while (binlogIter.hasNext()) {
TBinlog binlog = binlogIter.next();
if (binlog.getCommitSeq() <= expiredCommitSeq) {
binlogIter.remove();
} else {
break;
}
}
final long lastExpiredCommitSeq = expiredCommitSeq;
BinlogComparator checker = (binlog) -> {
// NOTE: TreeSet read size during iterator remove is valid.
//
// The expired conditions in order:
// 1. expired time
// 2. the max bytes
// 3. the max history num
return binlog.getCommitSeq() <= lastExpiredCommitSeq
|| maxBytes < binlogSize
|| maxHistoryNums < allBinlogs.size();
};
lastExpiredBinlog = getLastExpiredBinlog(checker);
} finally {
lock.writeLock().unlock();
}

if (expiredCommitSeq == -1) {
if (lastExpiredBinlog == null) {
return null;
}

// step 2: gc every tableBinlog in dbBinlog, get table tombstone to complete db tombstone
// step 2: gc every tableBinlog in dbBinlog, get table tombstone to complete db
// tombstone
List<BinlogTombstone> tableTombstones = Lists.newArrayList();
for (TableBinlog tableBinlog : tableBinlogMap.values()) {
// step 2.1: gc tableBinlog,and get table tombstone
BinlogTombstone tableTombstone = tableBinlog.commitSeqGc(expiredCommitSeq);
BinlogTombstone tableTombstone = tableBinlog.commitSeqGc(lastExpiredBinlog.getCommitSeq());
if (tableTombstone != null) {
tableTombstones.add(tableTombstone);
}
Expand All @@ -386,28 +432,8 @@ public void dbBinlogEnableReplayGc(BinlogTombstone tombstone) {

lock.writeLock().lock();
try {
Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
while (timeIter.hasNext()) {
long commitSeq = timeIter.next().first;
if (commitSeq <= largestExpiredCommitSeq) {
timeIter.remove();
} else {
break;
}
}

Iterator<TBinlog> binlogIter = allBinlogs.iterator();
TBinlog dummy = binlogIter.next();
dummy.setCommitSeq(largestExpiredCommitSeq);

while (binlogIter.hasNext()) {
TBinlog binlog = binlogIter.next();
if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
binlogIter.remove();
} else {
break;
}
}
BinlogComparator checker = (binlog) -> binlog.getCommitSeq() <= largestExpiredCommitSeq;
getLastExpiredBinlog(checker);
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -478,6 +504,8 @@ public void getBinlogInfo(BaseProcResult result) {
info.add(dropped);
String binlogLength = String.valueOf(allBinlogs.size());
info.add(binlogLength);
String binlogSize = String.valueOf(this.binlogSize);
info.add(binlogSize);
String firstBinlogCommittedTime = null;
String readableFirstBinlogCommittedTime = null;
if (!timestamps.isEmpty()) {
Expand All @@ -497,10 +525,16 @@ public void getBinlogInfo(BaseProcResult result) {
info.add(lastBinlogCommittedTime);
info.add(readableLastBinlogCommittedTime);
String binlogTtlSeconds = null;
String binlogMaxBytes = null;
String binlogMaxHistoryNums = null;
if (binlogConfig != null) {
binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds());
binlogMaxBytes = String.valueOf(binlogConfig.getMaxBytes());
binlogMaxHistoryNums = String.valueOf(binlogConfig.getMaxHistoryNums());
}
info.add(binlogTtlSeconds);
info.add(binlogMaxBytes);
info.add(binlogMaxHistoryNums);

result.addRow(info);
} else {
Expand Down
Loading