From 7545634d06f8916ed395ebea448e0e60d1d9d584 Mon Sep 17 00:00:00 2001 From: Jack Drogon Date: Thu, 3 Aug 2023 16:30:45 +0800 Subject: [PATCH] [Enhancement](binlog) Add Barrier log into BinlogManager Signed-off-by: Jack Drogon --- .../org/apache/doris/backup/BackupJob.java | 12 +++-- .../apache/doris/binlog/BinlogManager.java | 22 +++++++++ .../apache/doris/journal/JournalEntity.java | 2 +- .../org/apache/doris/persist/BarrierLog.java | 49 +++++++++++++++++++ .../org/apache/doris/persist/EditLog.java | 11 +++-- gensrc/thrift/FrontendService.thrift | 1 + 6 files changed, 89 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index 565efda2d04fcd..fefb26e0b9586d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -36,6 +36,7 @@ import org.apache.doris.catalog.View; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.persist.BarrierLog; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; @@ -376,7 +377,7 @@ private void prepareAndSendSnapshotTask() { OlapTable olapTable = (OlapTable) tbl; checkOlapTable(olapTable, tableRef); if (getContent() == BackupContent.ALL) { - prepareSnapshotTaskForOlapTableWithoutLock((OlapTable) tbl, tableRef, batchTask); + prepareSnapshotTaskForOlapTableWithoutLock(db, (OlapTable) tbl, tableRef, batchTask); } prepareBackupMetaForOlapTableWithoutLock(tableRef, olapTable, copiedTables); break; @@ -430,10 +431,15 @@ private void checkOlapTable(OlapTable olapTable, TableRef backupTableRef) { } } - private void prepareSnapshotTaskForOlapTableWithoutLock(OlapTable olapTable, + private void prepareSnapshotTaskForOlapTableWithoutLock(Database db, OlapTable olapTable, TableRef backupTableRef, AgentBatchTask batchTask) { // Add barrier editolog for barrier commit seq - long commitSeq = env.getEditLog().logBarrier(); + long dbId = db.getId(); + String dbName = db.getFullName(); + long tableId = olapTable.getId(); + String tableName = olapTable.getName(); + BarrierLog barrierLog = new BarrierLog(dbId, dbName, tableId, tableName); + long commitSeq = env.getEditLog().logBarrier(barrierLog); // format as "table:{tableId}" String tableKey = String.format("%s%d", TABLE_COMMIT_SEQ_PREFIX, olapTable.getId()); properties.put(tableKey, String.valueOf(commitSeq)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 43a95ed28ea266..3047146162e876 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -23,6 +23,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.persist.AlterDatabasePropertyInfo; +import org.apache.doris.persist.BarrierLog; import org.apache.doris.persist.BinlogGcInfo; import org.apache.doris.persist.DropPartitionInfo; import org.apache.doris.persist.ModifyTablePropertyOperationLog; @@ -247,6 +248,27 @@ public void addModifyTableProperty(ModifyTablePropertyOperationLog info, long co addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true); } + // add Barrier log + public void addBarrierLog(BarrierLog barrierLog, long commitSeq) { + if (barrierLog == null) { + return; + } + + long dbId = barrierLog.getDbId(); + long tableId = barrierLog.getTableId(); + if (dbId == 0 || tableId == 0) { + return; + } + + List tableIds = Lists.newArrayList(); + tableIds.add(tableId); + long timestamp = -1; + TBinlogType type = TBinlogType.BARRIER; + String data = barrierLog.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + } + // get binlog by dbId, return first binlog.version > version public Pair getBinlog(long dbId, long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 43c8227761e5de..e155751d5b5108 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -853,7 +853,7 @@ public void readFields(DataInput in) throws IOException { break; } case OperationType.OP_BARRIER: { - data = new BarrierLog(); + data = BarrierLog.read(in); isRead = true; break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java index e44cbb2f31beca..ea849d217d740c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java @@ -19,16 +19,65 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class BarrierLog implements Writable { + @SerializedName(value = "dbId") + long dbId = 0L; + @SerializedName(value = "dbName") + String dbName; + @SerializedName(value = "tableId") + long tableId = 0L; + @SerializedName(value = "tableName") + String tableName; + public BarrierLog() { } + public BarrierLog(long dbId, String dbName, long tableId, String tableName) { + this.dbId = dbId; + this.dbName = dbName; + this.tableId = tableId; + this.tableName = tableName; + } + + public long getDbId() { + return dbId; + } + + public String getDbName() { + return dbName; + } + + public long getTableId() { + return tableId; + } + + public String getTableName() { + return tableName; + } + @Override public void write(DataOutput out) throws IOException { Text.writeString(out, ""); } + + public static BarrierLog read(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), BarrierLog.class); + } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } + + @Override + public String toString() { + return toJson(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 356eb85daa4e28..d5289e0c32824a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -1077,8 +1077,8 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { break; } case OperationType.OP_BARRIER: { - // the log only for barrier commit seq, not need to replay - LOG.info("replay barrier"); + BarrierLog log = (BarrierLog) journal.getData(); + env.getBinlogManager().addBarrierLog(log, logId); break; } case OperationType.OP_UPDATE_AUTO_INCREMENT_ID: { @@ -1893,8 +1893,11 @@ public long logGcBinlog(BinlogGcInfo log) { return logEdit(OperationType.OP_GC_BINLOG, log); } - public long logBarrier() { - return logEdit(OperationType.OP_BARRIER, new BarrierLog()); + public long logBarrier(BarrierLog log) { + long logId = logEdit(OperationType.OP_BARRIER, log); + Env.getCurrentEnv().getBinlogManager().addBarrierLog(log, logId); + LOG.info("logId {}, barrier {}", logId, log); + return logId; } public void logUpdateAutoIncrementId(AutoIncrementIdUpdateLog log) { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 0ea96a15f480b8..bc0225982e2efc 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -967,6 +967,7 @@ enum TBinlogType { DUMMY = 7, ALTER_DATABASE_PROPERTY = 8, MODIFY_TABLE_PROPERTY = 9, + BARRIER = 10, } struct TBinlog {