Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private void addBinlog(TBinlog binlog, Object raw) {
}

private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, long timestamp, TBinlogType type,
String data, boolean removeEnableCache, Object raw) {
String data, boolean removeEnableCache, Object raw) {
if (!Config.enable_feature_binlog) {
return;
}
Expand Down Expand Up @@ -447,7 +447,6 @@ public void addDropRollup(DropInfo info, long commitSeq) {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}


private boolean supportedRecoverInfo(RecoverInfo info) {
//table name and partitionName added together.
// recover table case, tablename must exist in newer version
Expand Down Expand Up @@ -509,6 +508,26 @@ public Pair<TStatus, Long> getBinlogLag(long dbId, long tableId, long prevCommit
}
}

public Pair<TStatus, Long> lockBinlog(long dbId, long tableId,
String jobUniqueId, long lockCommitSeq) {
LOG.debug("lock binlog. dbId: {}, tableId: {}, jobUniqueId: {}, lockCommitSeq: {}",
dbId, tableId, jobUniqueId, lockCommitSeq);

DBBinlog dbBinlog = null;
lock.readLock().lock();
try {
dbBinlog = dbBinlogMap.get(dbId);
} finally {
lock.readLock().unlock();
}

if (dbBinlog == null) {
LOG.warn("db binlog not found. dbId: {}", dbId);
return Pair.of(new TStatus(TStatusCode.BINLOG_NOT_FOUND_DB), -1L);
}
return dbBinlog.lockBinlog(tableId, jobUniqueId, lockCommitSeq);
}

// get the dropped partitions of the db.
public List<Long> getDroppedPartitions(long dbId) {
lock.readLock().lock();
Expand Down Expand Up @@ -623,7 +642,6 @@ public void removeTable(long dbId, long tableId) {
}
}


private static void writeTBinlogToStream(DataOutputStream dos, TBinlog binlog) throws TException, IOException {
TMemoryBuffer buffer = new TMemoryBuffer(BUFFER_SIZE);
TBinaryProtocol protocol = new TBinaryProtocol(buffer);
Expand All @@ -633,7 +651,6 @@ private static void writeTBinlogToStream(DataOutputStream dos, TBinlog binlog) t
dos.write(data);
}


// not thread safety, do this without lock
public long write(DataOutputStream dos, long checksum) throws IOException {
if (!Config.enable_feature_binlog) {
Expand Down
54 changes: 54 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public class DBBinlog {

private BinlogConfigCache binlogConfigCache;

// The binlogs that are locked by the syncer.
// syncer id => commit seq
private Map<String, Long> lockedBinlogs;

public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) {
lock = new ReentrantReadWriteLock();
this.dbId = binlog.getDbId();
Expand All @@ -89,6 +93,7 @@ public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) {
droppedPartitions = Lists.newArrayList();
droppedTables = Lists.newArrayList();
droppedIndexes = Lists.newArrayList();
lockedBinlogs = Maps.newHashMap();

TBinlog dummy;
if (binlog.getType() == TBinlogType.DUMMY) {
Expand Down Expand Up @@ -281,6 +286,55 @@ public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) {
}
}

public Pair<TStatus, Long> lockBinlog(long tableId, String jobUniqueId, long lockCommitSeq) {
TableBinlog tableBinlog = null;

lock.writeLock().lock();
try {
if (tableId < 0) {
return lockDbBinlog(jobUniqueId, lockCommitSeq);
}

tableBinlog = tableBinlogMap.get(tableId);
} finally {
lock.writeLock().unlock();
}

if (tableBinlog == null) {
LOG.warn("table binlog not found. dbId: {}, tableId: {}", dbId, tableId);
return Pair.of(new TStatus(TStatusCode.BINLOG_NOT_FOUND_TABLE), -1L);
}
return tableBinlog.lockBinlog(jobUniqueId, lockCommitSeq);
}

// Require: the write lock is held by the caller.
private Pair<TStatus, Long> lockDbBinlog(String jobUniqueId, long lockCommitSeq) {
TBinlog firstBinlog = allBinlogs.first();
TBinlog lastBinlog = allBinlogs.last();

if (lockCommitSeq < 0) {
// lock the latest binlog
lockCommitSeq = lastBinlog.getCommitSeq();
} else if (lockCommitSeq < firstBinlog.getCommitSeq()) {
// lock the first binlog
lockCommitSeq = firstBinlog.getCommitSeq();
} else if (lastBinlog.getCommitSeq() < lockCommitSeq) {
LOG.warn("try lock future binlogs, dbId: {}, lockCommitSeq: {}, lastCommitSeq: {}, jobId: {}",
dbId, lockCommitSeq, lastBinlog.getCommitSeq(), jobUniqueId);
return Pair.of(new TStatus(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ), -1L);
}

// keep idempotent
Long commitSeq = lockedBinlogs.get(jobUniqueId);
if (commitSeq != null && lockCommitSeq <= commitSeq) {
LOG.debug("binlog is locked, commitSeq: {}, jobId: {}, dbId: {}", commitSeq, jobUniqueId, dbId);
return Pair.of(new TStatus(TStatusCode.OK), commitSeq);
}

lockedBinlogs.put(jobUniqueId, lockCommitSeq);
return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq);
}

public BinlogTombstone gc() {
// check db
BinlogConfig dbBinlogConfig = binlogConfigCache.getDBBinlogConfig(dbId);
Expand Down
47 changes: 47 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -36,6 +38,7 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -54,13 +57,18 @@ public class TableBinlog {

private BinlogConfigCache binlogConfigCache;

// The binlogs that are locked by the syncer.
// syncer id => commit seq
private Map<String, Long> lockedBinlogs;

public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, long dbId, long tableId) {
this.dbId = dbId;
this.tableId = tableId;
this.binlogSize = 0;
lock = new ReentrantReadWriteLock();
binlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
timestamps = Lists.newArrayList();
lockedBinlogs = Maps.newHashMap();

TBinlog dummy;
if (binlog.getType() == TBinlogType.DUMMY) {
Expand Down Expand Up @@ -124,6 +132,45 @@ public Pair<TStatus, Long> getBinlogLag(long prevCommitSeq) {
}
}

public Pair<TStatus, Long> lockBinlog(String jobUniqueId, long lockCommitSeq) {
lock.writeLock().lock();
try {
return lockTableBinlog(jobUniqueId, lockCommitSeq);
} finally {
lock.writeLock().unlock();
}
}

// Require: the lock is held by the caller.
private Pair<TStatus, Long> lockTableBinlog(String jobUniqueId, long lockCommitSeq) {
TBinlog firstBinlog = binlogs.first();
TBinlog lastBinlog = binlogs.last();

if (lockCommitSeq < 0) {
// lock the latest binlog
lockCommitSeq = lastBinlog.getCommitSeq();
} else if (lockCommitSeq < firstBinlog.getCommitSeq()) {
// lock the first binlog
lockCommitSeq = firstBinlog.getCommitSeq();
} else if (lastBinlog.getCommitSeq() < lockCommitSeq) {
LOG.warn(
"try lock future binlogs, dbId: {}, tableId: {}, lockCommitSeq: {}, lastCommitSeq: {}, jobId: {}",
dbId, tableId, lockCommitSeq, lastBinlog.getCommitSeq(), jobUniqueId);
return Pair.of(new TStatus(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ), -1L);
}

// keep idempotent
Long commitSeq = lockedBinlogs.get(jobUniqueId);
if (commitSeq != null && lockCommitSeq <= commitSeq) {
LOG.debug("binlog is locked, commitSeq: {}, jobId: {}, dbId: {}, tableId: {}",
commitSeq, jobUniqueId, dbId, tableId);
return Pair.of(new TStatus(TStatusCode.OK), commitSeq);
}

lockedBinlogs.put(jobUniqueId, lockCommitSeq);
return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq);
}

private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(BinlogComparator checker) {
if (binlogs.size() <= 1) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@
import org.apache.doris.thrift.TLoadTxnCommitResult;
import org.apache.doris.thrift.TLoadTxnRollbackRequest;
import org.apache.doris.thrift.TLoadTxnRollbackResult;
import org.apache.doris.thrift.TLockBinlogRequest;
import org.apache.doris.thrift.TLockBinlogResult;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TMasterResult;
Expand Down Expand Up @@ -3206,7 +3208,7 @@ public TGetMasterTokenResult getMasterToken(TGetMasterTokenRequest request) thro
public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive get binlog request: {}", request);
LOG.debug("receive get binlog lag request: {}", request);
}

TGetBinlogLagResult result = new TGetBinlogLagResult();
Expand All @@ -3217,14 +3219,14 @@ public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws TExcep
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to get beginTxn: {}", NOT_MASTER_ERR_MSG);
LOG.error("failed to get binlog lag: {}", NOT_MASTER_ERR_MSG);
return result;
}

try {
result = getBinlogLagImpl(request, clientAddr);
} catch (UserException e) {
LOG.warn("failed to get binlog: {}", e.getMessage());
LOG.warn("failed to get binlog lag: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
Expand Down Expand Up @@ -3304,6 +3306,102 @@ private TGetBinlogLagResult getBinlogLagImpl(TGetBinlogRequest request, String c
return result;
}

public TLockBinlogResult lockBinlog(TLockBinlogRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive lock binlog request: {}", request);
}

TLockBinlogResult result = new TLockBinlogResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);

if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to lock binlog: {}", NOT_MASTER_ERR_MSG);
return result;
}

try {
result = lockBinlogImpl(request, clientAddr);
} catch (UserException e) {
LOG.warn("failed to lock binlog: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}

return result;
}

private TLockBinlogResult lockBinlogImpl(TLockBinlogRequest request, String clientIp) throws UserException {
/// Check all required arg: user, passwd, db, prev_commit_seq
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
if (!request.isSetPasswd()) {
throw new UserException("passwd is not set");
}
if (!request.isSetDb()) {
throw new UserException("db is not set");
}
if (!request.isSetJobUniqueId()) {
throw new UserException("job_unique_id is not set");
}

// step 1: check auth
if (Strings.isNullOrEmpty(request.getToken())) {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
request.getTable(), clientIp, PrivPredicate.SELECT);
}

// step 3: check database
Env env = Env.getCurrentEnv();
String fullDbName = request.getDb();
Database db = env.getInternalCatalog().getDbNullable(fullDbName);
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
dbName = request.getDb();
}
throw new UserException("unknown database, database=" + dbName);
}

// step 4: fetch all tableIds
// lookup tables && convert into tableIdList
long tableId = -1;
if (request.isSetTableId()) {
tableId = request.getTableId();
} else if (request.isSetTable()) {
String tableName = request.getTable();
Table table = db.getTableOrMetaException(tableName, TableType.OLAP);
if (table == null) {
throw new UserException("unknown table, table=" + tableName);
}
tableId = table.getId();
}

// step 6: lock binlog
long dbId = db.getId();
String jobUniqueId = request.getJobUniqueId();
long lockCommitSeq = -1L;
if (request.isSetLockCommitSeq()) {
lockCommitSeq = request.getLockCommitSeq();
}
Pair<TStatus, Long> statusSeqPair = env.getBinlogManager().lockBinlog(
dbId, tableId, jobUniqueId, lockCommitSeq);
TLockBinlogResult result = new TLockBinlogResult();
result.setStatus(statusSeqPair.first);
result.setLockedCommitSeq(statusSeqPair.second);
return result;
}

@Override
public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws TException {
StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class);
Expand Down
24 changes: 24 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,29 @@ struct TQueryStatsResult {
5: optional map<i64, i64> tablet_stats
}

// Lock the binlogs, to avoid being GC during sync.
//
// The caller should lock the binlog before backup, and bumps lock commit seq intervally.
//
// The locked binlogs will be kept until the binlog properties ttl_seconds, max_bytes ... are reached.
struct TLockBinlogRequest {
1: optional string cluster
2: optional string user
3: optional string passwd
4: optional string db
5: optional string table
6: optional i64 table_id
7: optional string token
8: optional string job_unique_id
9: optional i64 lock_commit_seq // if not set, lock the latest binlog
}

struct TLockBinlogResult {
1: optional Status.TStatus status
2: optional i64 locked_commit_seq
3: optional Types.TNetworkAddress master_address
}

struct TGetBinlogRequest {
1: optional string cluster
2: optional string user
Expand Down Expand Up @@ -1532,6 +1555,7 @@ service FrontendService {
TGetBinlogResult getBinlog(1: TGetBinlogRequest request)
TGetSnapshotResult getSnapshot(1: TGetSnapshotRequest request)
TRestoreSnapshotResult restoreSnapshot(1: TRestoreSnapshotRequest request)
TLockBinlogResult lockBinlog(1: TLockBinlogRequest request)

TWaitingTxnStatusResult waitingTxnStatus(1: TWaitingTxnStatusRequest request)

Expand Down
Loading