From d1531d9f8ae0446b02e17e4a9e5c009c65623eb0 Mon Sep 17 00:00:00 2001 From: walter Date: Fri, 17 Jan 2025 16:38:14 +0800 Subject: [PATCH] [feat](binlog) Add lock binlog method (#46887) This is the first PR for locking binlogs. To reduce the cost of maintaining binlogs, an API named lockBinlog has been added. Users use this API to indicate which binlogs are not permitted for GC. --- .../apache/doris/binlog/BinlogManager.java | 25 ++++- .../org/apache/doris/binlog/DBBinlog.java | 54 +++++++++ .../org/apache/doris/binlog/TableBinlog.java | 47 ++++++++ .../doris/service/FrontendServiceImpl.java | 104 +++++++++++++++++- gensrc/thrift/FrontendService.thrift | 24 ++++ 5 files changed, 247 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 723262ff31bc95..b15e511e75ca06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -130,7 +130,7 @@ private void addBinlog(TBinlog binlog, Object raw) { } private void addBinlog(long dbId, List tableIds, long commitSeq, long timestamp, TBinlogType type, - String data, boolean removeEnableCache, Object raw) { + String data, boolean removeEnableCache, Object raw) { if (!Config.enable_feature_binlog) { return; } @@ -447,7 +447,6 @@ public void addDropRollup(DropInfo info, long commitSeq) { addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); } - private boolean supportedRecoverInfo(RecoverInfo info) { //table name and partitionName added together. // recover table case, tablename must exist in newer version @@ -518,6 +517,26 @@ public Pair getBinlogLag(long dbId, long tableId, long prevCommit } } + public Pair lockBinlog(long dbId, long tableId, + String jobUniqueId, long lockCommitSeq) { + LOG.debug("lock binlog. dbId: {}, tableId: {}, jobUniqueId: {}, lockCommitSeq: {}", + dbId, tableId, jobUniqueId, lockCommitSeq); + + DBBinlog dbBinlog = null; + lock.readLock().lock(); + try { + dbBinlog = dbBinlogMap.get(dbId); + } finally { + lock.readLock().unlock(); + } + + if (dbBinlog == null) { + LOG.warn("db binlog not found. dbId: {}", dbId); + return Pair.of(new TStatus(TStatusCode.BINLOG_NOT_FOUND_DB), -1L); + } + return dbBinlog.lockBinlog(tableId, jobUniqueId, lockCommitSeq); + } + // get the dropped partitions of the db. public List getDroppedPartitions(long dbId) { lock.readLock().lock(); @@ -632,7 +651,6 @@ public void removeTable(long dbId, long tableId) { } } - private static void writeTBinlogToStream(DataOutputStream dos, TBinlog binlog) throws TException, IOException { TMemoryBuffer buffer = new TMemoryBuffer(BUFFER_SIZE); TBinaryProtocol protocol = new TBinaryProtocol(buffer); @@ -642,7 +660,6 @@ private static void writeTBinlogToStream(DataOutputStream dos, TBinlog binlog) t dos.write(data); } - // not thread safety, do this without lock public long write(DataOutputStream dos, long checksum) throws IOException { if (!Config.enable_feature_binlog) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index b5e8a48df841c0..2fbee550c91304 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -75,6 +75,10 @@ public class DBBinlog { private BinlogConfigCache binlogConfigCache; + // The binlogs that are locked by the syncer. + // syncer id => commit seq + private Map lockedBinlogs; + public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) { lock = new ReentrantReadWriteLock(); this.dbId = binlog.getDbId(); @@ -89,6 +93,7 @@ public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) { droppedPartitions = Lists.newArrayList(); droppedTables = Lists.newArrayList(); droppedIndexes = Lists.newArrayList(); + lockedBinlogs = Maps.newHashMap(); TBinlog dummy; if (binlog.getType() == TBinlogType.DUMMY) { @@ -281,6 +286,55 @@ public Pair getBinlogLag(long tableId, long prevCommitSeq) { } } + public Pair lockBinlog(long tableId, String jobUniqueId, long lockCommitSeq) { + TableBinlog tableBinlog = null; + + lock.writeLock().lock(); + try { + if (tableId < 0) { + return lockDbBinlog(jobUniqueId, lockCommitSeq); + } + + tableBinlog = tableBinlogMap.get(tableId); + } finally { + lock.writeLock().unlock(); + } + + if (tableBinlog == null) { + LOG.warn("table binlog not found. dbId: {}, tableId: {}", dbId, tableId); + return Pair.of(new TStatus(TStatusCode.BINLOG_NOT_FOUND_TABLE), -1L); + } + return tableBinlog.lockBinlog(jobUniqueId, lockCommitSeq); + } + + // Require: the write lock is held by the caller. + private Pair lockDbBinlog(String jobUniqueId, long lockCommitSeq) { + TBinlog firstBinlog = allBinlogs.first(); + TBinlog lastBinlog = allBinlogs.last(); + + if (lockCommitSeq < 0) { + // lock the latest binlog + lockCommitSeq = lastBinlog.getCommitSeq(); + } else if (lockCommitSeq < firstBinlog.getCommitSeq()) { + // lock the first binlog + lockCommitSeq = firstBinlog.getCommitSeq(); + } else if (lastBinlog.getCommitSeq() < lockCommitSeq) { + LOG.warn("try lock future binlogs, dbId: {}, lockCommitSeq: {}, lastCommitSeq: {}, jobId: {}", + dbId, lockCommitSeq, lastBinlog.getCommitSeq(), jobUniqueId); + return Pair.of(new TStatus(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ), -1L); + } + + // keep idempotent + Long commitSeq = lockedBinlogs.get(jobUniqueId); + if (commitSeq != null && lockCommitSeq <= commitSeq) { + LOG.debug("binlog is locked, commitSeq: {}, jobId: {}, dbId: {}", commitSeq, jobUniqueId, dbId); + return Pair.of(new TStatus(TStatusCode.OK), commitSeq); + } + + lockedBinlogs.put(jobUniqueId, lockCommitSeq); + return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq); + } + public BinlogTombstone gc() { // check db BinlogConfig dbBinlogConfig = binlogConfigCache.getDBBinlogConfig(dbId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 755a0bfd171248..718fce49ee1b47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -26,8 +26,10 @@ import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; +import org.apache.doris.thrift.TStatusCode; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,6 +38,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -54,6 +57,10 @@ public class TableBinlog { private BinlogConfigCache binlogConfigCache; + // The binlogs that are locked by the syncer. + // syncer id => commit seq + private Map lockedBinlogs; + public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, long dbId, long tableId) { this.dbId = dbId; this.tableId = tableId; @@ -61,6 +68,7 @@ public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, long dbI lock = new ReentrantReadWriteLock(); binlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq)); timestamps = Lists.newArrayList(); + lockedBinlogs = Maps.newHashMap(); TBinlog dummy; if (binlog.getType() == TBinlogType.DUMMY) { @@ -124,6 +132,45 @@ public Pair getBinlogLag(long prevCommitSeq) { } } + public Pair lockBinlog(String jobUniqueId, long lockCommitSeq) { + lock.writeLock().lock(); + try { + return lockTableBinlog(jobUniqueId, lockCommitSeq); + } finally { + lock.writeLock().unlock(); + } + } + + // Require: the lock is held by the caller. + private Pair lockTableBinlog(String jobUniqueId, long lockCommitSeq) { + TBinlog firstBinlog = binlogs.first(); + TBinlog lastBinlog = binlogs.last(); + + if (lockCommitSeq < 0) { + // lock the latest binlog + lockCommitSeq = lastBinlog.getCommitSeq(); + } else if (lockCommitSeq < firstBinlog.getCommitSeq()) { + // lock the first binlog + lockCommitSeq = firstBinlog.getCommitSeq(); + } else if (lastBinlog.getCommitSeq() < lockCommitSeq) { + LOG.warn( + "try lock future binlogs, dbId: {}, tableId: {}, lockCommitSeq: {}, lastCommitSeq: {}, jobId: {}", + dbId, tableId, lockCommitSeq, lastBinlog.getCommitSeq(), jobUniqueId); + return Pair.of(new TStatus(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ), -1L); + } + + // keep idempotent + Long commitSeq = lockedBinlogs.get(jobUniqueId); + if (commitSeq != null && lockCommitSeq <= commitSeq) { + LOG.debug("binlog is locked, commitSeq: {}, jobId: {}, dbId: {}, tableId: {}", + commitSeq, jobUniqueId, dbId, tableId); + return Pair.of(new TStatus(TStatusCode.OK), commitSeq); + } + + lockedBinlogs.put(jobUniqueId, lockCommitSeq); + return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq); + } + private Pair getLastUpsertAndLargestCommitSeq(BinlogComparator checker) { if (binlogs.size() <= 1) { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 6e744ca5621b9f..5b288792adc1f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -195,6 +195,8 @@ import org.apache.doris.thrift.TLoadTxnCommitResult; import org.apache.doris.thrift.TLoadTxnRollbackRequest; import org.apache.doris.thrift.TLoadTxnRollbackResult; +import org.apache.doris.thrift.TLockBinlogRequest; +import org.apache.doris.thrift.TLockBinlogResult; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TMasterResult; @@ -3283,7 +3285,7 @@ public TGetMasterTokenResult getMasterToken(TGetMasterTokenRequest request) thro public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws TException { String clientAddr = getClientAddrAsString(); if (LOG.isDebugEnabled()) { - LOG.debug("receive get binlog request: {}", request); + LOG.debug("receive get binlog lag request: {}", request); } TGetBinlogLagResult result = new TGetBinlogLagResult(); @@ -3294,14 +3296,14 @@ public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws TExcep status.setStatusCode(TStatusCode.NOT_MASTER); status.addToErrorMsgs(NOT_MASTER_ERR_MSG); result.setMasterAddress(getMasterAddress()); - LOG.error("failed to get beginTxn: {}", NOT_MASTER_ERR_MSG); + LOG.error("failed to get binlog lag: {}", NOT_MASTER_ERR_MSG); return result; } try { result = getBinlogLagImpl(request, clientAddr); } catch (UserException e) { - LOG.warn("failed to get binlog: {}", e.getMessage()); + LOG.warn("failed to get binlog lag: {}", e.getMessage()); status.setStatusCode(TStatusCode.ANALYSIS_ERROR); status.addToErrorMsgs(e.getMessage()); } catch (Throwable e) { @@ -3381,6 +3383,102 @@ private TGetBinlogLagResult getBinlogLagImpl(TGetBinlogRequest request, String c return result; } + public TLockBinlogResult lockBinlog(TLockBinlogRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + if (LOG.isDebugEnabled()) { + LOG.debug("receive lock binlog request: {}", request); + } + + TLockBinlogResult result = new TLockBinlogResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + + if (!Env.getCurrentEnv().isMaster()) { + status.setStatusCode(TStatusCode.NOT_MASTER); + status.addToErrorMsgs(NOT_MASTER_ERR_MSG); + result.setMasterAddress(getMasterAddress()); + LOG.error("failed to lock binlog: {}", NOT_MASTER_ERR_MSG); + return result; + } + + try { + result = lockBinlogImpl(request, clientAddr); + } catch (UserException e) { + LOG.warn("failed to lock binlog: {}", e.getMessage()); + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + return result; + } + + return result; + } + + private TLockBinlogResult lockBinlogImpl(TLockBinlogRequest request, String clientIp) throws UserException { + /// Check all required arg: user, passwd, db, prev_commit_seq + if (!request.isSetUser()) { + throw new UserException("user is not set"); + } + if (!request.isSetPasswd()) { + throw new UserException("passwd is not set"); + } + if (!request.isSetDb()) { + throw new UserException("db is not set"); + } + if (!request.isSetJobUniqueId()) { + throw new UserException("job_unique_id is not set"); + } + + // step 1: check auth + if (Strings.isNullOrEmpty(request.getToken())) { + checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), + request.getTable(), clientIp, PrivPredicate.SELECT); + } + + // step 3: check database + Env env = Env.getCurrentEnv(); + String fullDbName = request.getDb(); + Database db = env.getInternalCatalog().getDbNullable(fullDbName); + if (db == null) { + String dbName = fullDbName; + if (Strings.isNullOrEmpty(request.getCluster())) { + dbName = request.getDb(); + } + throw new UserException("unknown database, database=" + dbName); + } + + // step 4: fetch all tableIds + // lookup tables && convert into tableIdList + long tableId = -1; + if (request.isSetTableId()) { + tableId = request.getTableId(); + } else if (request.isSetTable()) { + String tableName = request.getTable(); + Table table = db.getTableOrMetaException(tableName, TableType.OLAP); + if (table == null) { + throw new UserException("unknown table, table=" + tableName); + } + tableId = table.getId(); + } + + // step 6: lock binlog + long dbId = db.getId(); + String jobUniqueId = request.getJobUniqueId(); + long lockCommitSeq = -1L; + if (request.isSetLockCommitSeq()) { + lockCommitSeq = request.getLockCommitSeq(); + } + Pair statusSeqPair = env.getBinlogManager().lockBinlog( + dbId, tableId, jobUniqueId, lockCommitSeq); + TLockBinlogResult result = new TLockBinlogResult(); + result.setStatus(statusSeqPair.first); + result.setLockedCommitSeq(statusSeqPair.second); + return result; + } + @Override public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws TException { StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 878eb4104e086f..8f79b2b98acb68 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1162,6 +1162,29 @@ struct TQueryStatsResult { 5: optional map tablet_stats } +// Lock the binlogs, to avoid being GC during sync. +// +// The caller should lock the binlog before backup, and bumps lock commit seq intervally. +// +// The locked binlogs will be kept until the binlog properties ttl_seconds, max_bytes ... are reached. +struct TLockBinlogRequest { + 1: optional string cluster + 2: optional string user + 3: optional string passwd + 4: optional string db + 5: optional string table + 6: optional i64 table_id + 7: optional string token + 8: optional string job_unique_id + 9: optional i64 lock_commit_seq // if not set, lock the latest binlog +} + +struct TLockBinlogResult { + 1: optional Status.TStatus status + 2: optional i64 locked_commit_seq + 3: optional Types.TNetworkAddress master_address +} + struct TGetBinlogRequest { 1: optional string cluster 2: optional string user @@ -1750,6 +1773,7 @@ service FrontendService { TGetBinlogResult getBinlog(1: TGetBinlogRequest request) TGetSnapshotResult getSnapshot(1: TGetSnapshotRequest request) TRestoreSnapshotResult restoreSnapshot(1: TRestoreSnapshotRequest request) + TLockBinlogResult lockBinlog(1: TLockBinlogRequest request) TWaitingTxnStatusResult waitingTxnStatus(1: TWaitingTxnStatusRequest request)