From c907983a98a90340ea1e63f4445243c3afb70c36 Mon Sep 17 00:00:00 2001 From: walter Date: Fri, 17 Jan 2025 16:38:14 +0800 Subject: [PATCH] [feat](binlog) Add lock binlog method (#46887) This is the first PR for locking binlogs. To reduce the cost of maintaining binlogs, an API named lockBinlog has been added. Users use this API to indicate which binlogs are not permitted for GC. --- .../apache/doris/binlog/BinlogManager.java | 25 ++++- .../org/apache/doris/binlog/DBBinlog.java | 54 +++++++++ .../org/apache/doris/binlog/TableBinlog.java | 47 ++++++++ .../doris/service/FrontendServiceImpl.java | 104 +++++++++++++++++- gensrc/thrift/FrontendService.thrift | 24 ++++ 5 files changed, 247 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index b21b7e751d5ee9..2403ede6fb3082 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -130,7 +130,7 @@ private void addBinlog(TBinlog binlog, Object raw) { } private void addBinlog(long dbId, List tableIds, long commitSeq, long timestamp, TBinlogType type, - String data, boolean removeEnableCache, Object raw) { + String data, boolean removeEnableCache, Object raw) { if (!Config.enable_feature_binlog) { return; } @@ -431,7 +431,6 @@ public void addDropRollup(DropInfo info, long commitSeq) { addBarrierLog(log, commitSeq); } - private boolean supportedRecoverInfo(RecoverInfo info) { //table name and partitionName added together. // recover table case, tablename must exist in newer version @@ -501,6 +500,26 @@ public Pair getBinlogLag(long dbId, long tableId, long p } } + public Pair lockBinlog(long dbId, long tableId, + String jobUniqueId, long lockCommitSeq) { + LOG.debug("lock binlog. dbId: {}, tableId: {}, jobUniqueId: {}, lockCommitSeq: {}", + dbId, tableId, jobUniqueId, lockCommitSeq); + + DBBinlog dbBinlog = null; + lock.readLock().lock(); + try { + dbBinlog = dbBinlogMap.get(dbId); + } finally { + lock.readLock().unlock(); + } + + if (dbBinlog == null) { + LOG.warn("db binlog not found. dbId: {}", dbId); + return Pair.of(new TStatus(TStatusCode.BINLOG_NOT_FOUND_DB), -1L); + } + return dbBinlog.lockBinlog(tableId, jobUniqueId, lockCommitSeq); + } + // get the dropped partitions of the db. public List getDroppedPartitions(long dbId) { lock.readLock().lock(); @@ -617,7 +636,6 @@ public void removeTable(long dbId, long tableId) { } } - private static void writeTBinlogToStream(DataOutputStream dos, TBinlog binlog) throws TException, IOException { TMemoryBuffer buffer = new TMemoryBuffer(BUFFER_SIZE); TBinaryProtocol protocol = new TBinaryProtocol(buffer); @@ -627,7 +645,6 @@ private static void writeTBinlogToStream(DataOutputStream dos, TBinlog binlog) t dos.write(data); } - // not thread safety, do this without lock public long write(DataOutputStream dos, long checksum) throws IOException { if (!Config.enable_feature_binlog) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 77a086e9872090..675c9dc78a824e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -75,6 +75,10 @@ public class DBBinlog { private BinlogConfigCache binlogConfigCache; + // The binlogs that are locked by the syncer. + // syncer id => commit seq + private Map lockedBinlogs; + public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) { lock = new ReentrantReadWriteLock(); this.dbId = binlog.getDbId(); @@ -89,6 +93,7 @@ public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) { droppedPartitions = Lists.newArrayList(); droppedTables = Lists.newArrayList(); droppedIndexes = Lists.newArrayList(); + lockedBinlogs = Maps.newHashMap(); TBinlog dummy; if (binlog.getType() == TBinlogType.DUMMY) { @@ -281,6 +286,55 @@ public Pair getBinlogLag(long tableId, long prevCommitSe } } + public Pair lockBinlog(long tableId, String jobUniqueId, long lockCommitSeq) { + TableBinlog tableBinlog = null; + + lock.writeLock().lock(); + try { + if (tableId < 0) { + return lockDbBinlog(jobUniqueId, lockCommitSeq); + } + + tableBinlog = tableBinlogMap.get(tableId); + } finally { + lock.writeLock().unlock(); + } + + if (tableBinlog == null) { + LOG.warn("table binlog not found. dbId: {}, tableId: {}", dbId, tableId); + return Pair.of(new TStatus(TStatusCode.BINLOG_NOT_FOUND_TABLE), -1L); + } + return tableBinlog.lockBinlog(jobUniqueId, lockCommitSeq); + } + + // Require: the write lock is held by the caller. + private Pair lockDbBinlog(String jobUniqueId, long lockCommitSeq) { + TBinlog firstBinlog = allBinlogs.first(); + TBinlog lastBinlog = allBinlogs.last(); + + if (lockCommitSeq < 0) { + // lock the latest binlog + lockCommitSeq = lastBinlog.getCommitSeq(); + } else if (lockCommitSeq < firstBinlog.getCommitSeq()) { + // lock the first binlog + lockCommitSeq = firstBinlog.getCommitSeq(); + } else if (lastBinlog.getCommitSeq() < lockCommitSeq) { + LOG.warn("try lock future binlogs, dbId: {}, lockCommitSeq: {}, lastCommitSeq: {}, jobId: {}", + dbId, lockCommitSeq, lastBinlog.getCommitSeq(), jobUniqueId); + return Pair.of(new TStatus(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ), -1L); + } + + // keep idempotent + Long commitSeq = lockedBinlogs.get(jobUniqueId); + if (commitSeq != null && lockCommitSeq <= commitSeq) { + LOG.debug("binlog is locked, commitSeq: {}, jobId: {}, dbId: {}", commitSeq, jobUniqueId, dbId); + return Pair.of(new TStatus(TStatusCode.OK), commitSeq); + } + + lockedBinlogs.put(jobUniqueId, lockCommitSeq); + return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq); + } + public BinlogTombstone gc() { // check db BinlogConfig dbBinlogConfig = binlogConfigCache.getDBBinlogConfig(dbId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 4a98768c30454d..b6cf328ecccbdc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -26,8 +26,10 @@ import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; +import org.apache.doris.thrift.TStatusCode; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,6 +38,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -54,6 +57,10 @@ public class TableBinlog { private BinlogConfigCache binlogConfigCache; + // The binlogs that are locked by the syncer. + // syncer id => commit seq + private Map lockedBinlogs; + public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, long dbId, long tableId) { this.dbId = dbId; this.tableId = tableId; @@ -61,6 +68,7 @@ public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, long dbI lock = new ReentrantReadWriteLock(); binlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq)); timestamps = Lists.newArrayList(); + lockedBinlogs = Maps.newHashMap(); TBinlog dummy; if (binlog.getType() == TBinlogType.DUMMY) { @@ -124,6 +132,45 @@ public Pair getBinlogLag(long prevCommitSeq) { } } + public Pair lockBinlog(String jobUniqueId, long lockCommitSeq) { + lock.writeLock().lock(); + try { + return lockTableBinlog(jobUniqueId, lockCommitSeq); + } finally { + lock.writeLock().unlock(); + } + } + + // Require: the lock is held by the caller. + private Pair lockTableBinlog(String jobUniqueId, long lockCommitSeq) { + TBinlog firstBinlog = binlogs.first(); + TBinlog lastBinlog = binlogs.last(); + + if (lockCommitSeq < 0) { + // lock the latest binlog + lockCommitSeq = lastBinlog.getCommitSeq(); + } else if (lockCommitSeq < firstBinlog.getCommitSeq()) { + // lock the first binlog + lockCommitSeq = firstBinlog.getCommitSeq(); + } else if (lastBinlog.getCommitSeq() < lockCommitSeq) { + LOG.warn( + "try lock future binlogs, dbId: {}, tableId: {}, lockCommitSeq: {}, lastCommitSeq: {}, jobId: {}", + dbId, tableId, lockCommitSeq, lastBinlog.getCommitSeq(), jobUniqueId); + return Pair.of(new TStatus(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ), -1L); + } + + // keep idempotent + Long commitSeq = lockedBinlogs.get(jobUniqueId); + if (commitSeq != null && lockCommitSeq <= commitSeq) { + LOG.debug("binlog is locked, commitSeq: {}, jobId: {}, dbId: {}, tableId: {}", + commitSeq, jobUniqueId, dbId, tableId); + return Pair.of(new TStatus(TStatusCode.OK), commitSeq); + } + + lockedBinlogs.put(jobUniqueId, lockCommitSeq); + return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq); + } + private Pair getLastUpsertAndLargestCommitSeq(BinlogComparator checker) { if (binlogs.size() <= 1) { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 45a9c75c7a5742..31d7771f7177bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -191,6 +191,8 @@ import org.apache.doris.thrift.TLoadTxnCommitResult; import org.apache.doris.thrift.TLoadTxnRollbackRequest; import org.apache.doris.thrift.TLoadTxnRollbackResult; +import org.apache.doris.thrift.TLockBinlogRequest; +import org.apache.doris.thrift.TLockBinlogResult; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TMasterResult; @@ -3364,7 +3366,7 @@ public TGetMasterTokenResult getMasterToken(TGetMasterTokenRequest request) thro public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws TException { String clientAddr = getClientAddrAsString(); if (LOG.isDebugEnabled()) { - LOG.debug("receive get binlog request: {}", request); + LOG.debug("receive get binlog lag request: {}", request); } TGetBinlogLagResult result = new TGetBinlogLagResult(); @@ -3375,14 +3377,14 @@ public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws TExcep status.setStatusCode(TStatusCode.NOT_MASTER); status.addToErrorMsgs(NOT_MASTER_ERR_MSG); result.setMasterAddress(getMasterAddress()); - LOG.error("failed to get beginTxn: {}", NOT_MASTER_ERR_MSG); + LOG.error("failed to get binlog lag: {}", NOT_MASTER_ERR_MSG); return result; } try { result = getBinlogLagImpl(request, clientAddr); } catch (UserException e) { - LOG.warn("failed to get binlog: {}", e.getMessage()); + LOG.warn("failed to get binlog lag: {}", e.getMessage()); status.setStatusCode(TStatusCode.ANALYSIS_ERROR); status.addToErrorMsgs(e.getMessage()); } catch (Throwable e) { @@ -3465,6 +3467,102 @@ private TGetBinlogLagResult getBinlogLagImpl(TGetBinlogRequest request, String c return result; } + public TLockBinlogResult lockBinlog(TLockBinlogRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + if (LOG.isDebugEnabled()) { + LOG.debug("receive lock binlog request: {}", request); + } + + TLockBinlogResult result = new TLockBinlogResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + + if (!Env.getCurrentEnv().isMaster()) { + status.setStatusCode(TStatusCode.NOT_MASTER); + status.addToErrorMsgs(NOT_MASTER_ERR_MSG); + result.setMasterAddress(getMasterAddress()); + LOG.error("failed to lock binlog: {}", NOT_MASTER_ERR_MSG); + return result; + } + + try { + result = lockBinlogImpl(request, clientAddr); + } catch (UserException e) { + LOG.warn("failed to lock binlog: {}", e.getMessage()); + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + return result; + } + + return result; + } + + private TLockBinlogResult lockBinlogImpl(TLockBinlogRequest request, String clientIp) throws UserException { + /// Check all required arg: user, passwd, db, prev_commit_seq + if (!request.isSetUser()) { + throw new UserException("user is not set"); + } + if (!request.isSetPasswd()) { + throw new UserException("passwd is not set"); + } + if (!request.isSetDb()) { + throw new UserException("db is not set"); + } + if (!request.isSetJobUniqueId()) { + throw new UserException("job_unique_id is not set"); + } + + // step 1: check auth + if (Strings.isNullOrEmpty(request.getToken())) { + checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), + request.getTable(), clientIp, PrivPredicate.SELECT); + } + + // step 3: check database + Env env = Env.getCurrentEnv(); + String fullDbName = request.getDb(); + Database db = env.getInternalCatalog().getDbNullable(fullDbName); + if (db == null) { + String dbName = fullDbName; + if (Strings.isNullOrEmpty(request.getCluster())) { + dbName = request.getDb(); + } + throw new UserException("unknown database, database=" + dbName); + } + + // step 4: fetch all tableIds + // lookup tables && convert into tableIdList + long tableId = -1; + if (request.isSetTableId()) { + tableId = request.getTableId(); + } else if (request.isSetTable()) { + String tableName = request.getTable(); + Table table = db.getTableOrMetaException(tableName, TableType.OLAP); + if (table == null) { + throw new UserException("unknown table, table=" + tableName); + } + tableId = table.getId(); + } + + // step 6: lock binlog + long dbId = db.getId(); + String jobUniqueId = request.getJobUniqueId(); + long lockCommitSeq = -1L; + if (request.isSetLockCommitSeq()) { + lockCommitSeq = request.getLockCommitSeq(); + } + Pair statusSeqPair = env.getBinlogManager().lockBinlog( + dbId, tableId, jobUniqueId, lockCommitSeq); + TLockBinlogResult result = new TLockBinlogResult(); + result.setStatus(statusSeqPair.first); + result.setLockedCommitSeq(statusSeqPair.second); + return result; + } + @Override public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws TException { StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index ad993dcbf5d3ff..bf4561ba83569a 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1115,6 +1115,29 @@ struct TQueryStatsResult { 5: optional map tablet_stats } +// Lock the binlogs, to avoid being GC during sync. +// +// The caller should lock the binlog before backup, and bumps lock commit seq intervally. +// +// The locked binlogs will be kept until the binlog properties ttl_seconds, max_bytes ... are reached. +struct TLockBinlogRequest { + 1: optional string cluster + 2: optional string user + 3: optional string passwd + 4: optional string db + 5: optional string table + 6: optional i64 table_id + 7: optional string token + 8: optional string job_unique_id + 9: optional i64 lock_commit_seq // if not set, lock the latest binlog +} + +struct TLockBinlogResult { + 1: optional Status.TStatus status + 2: optional i64 locked_commit_seq + 3: optional Types.TNetworkAddress master_address +} + struct TGetBinlogRequest { 1: optional string cluster 2: optional string user @@ -1680,6 +1703,7 @@ service FrontendService { TGetBinlogResult getBinlog(1: TGetBinlogRequest request) TGetSnapshotResult getSnapshot(1: TGetSnapshotRequest request) TRestoreSnapshotResult restoreSnapshot(1: TRestoreSnapshotRequest request) + TLockBinlogResult lockBinlog(1: TLockBinlogRequest request) TWaitingTxnStatusResult waitingTxnStatus(1: TWaitingTxnStatusRequest request)