datas) {
+ this.datas = datas;
+ }
+
+ public PositionRange getPositionRange() {
+ return positionRange;
+ }
+
+ public void setPositionRange(PositionRange
positionRange) {
+ this.positionRange = positionRange;
+ }
+
+ public void setMemSize(Long memSize) {
+ this.memSize = memSize;
+ }
+
+ public long getMemSize() {
+ return this.memSize;
+ }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/EntryPosition.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/EntryPosition.java
new file mode 100644
index 00000000000000..23deb5e4a05586
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/EntryPosition.java
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.position;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+
+import com.google.common.base.Strings;
+
+public class EntryPosition {
+ private String journalName;
+ private Long position;
+ private String gtid;
+ private Long executeTime;
+
+ public static final EntryPosition MIN_POS = new EntryPosition("", -1L, null);
+
+ public EntryPosition() {
+ this(null, (Long)null, (Long)null);
+ }
+
+ public EntryPosition(String journalName, Long position, Long timestamp) {
+ this.gtid = null;
+ this.journalName = journalName;
+ this.position = position;
+ this.executeTime = timestamp;
+ }
+
+ public EntryPosition(String journalName, Long position) {
+ this(journalName, position, (Long)null);
+ }
+
+ public String getJournalName() {
+ return this.journalName;
+ }
+
+ public void setJournalName(String journalName) {
+ this.journalName = journalName;
+ }
+
+ public Long getPosition() {
+ return this.position;
+ }
+
+ public void setPosition(Long position) {
+ this.position = position;
+ }
+
+ public Long getExecuteTime() {
+ return this.executeTime;
+ }
+
+ public void setExecuteTime(Long timeStamp) {
+ this.executeTime = timeStamp;
+ }
+
+ public String getGtid() {
+ return this.gtid;
+ }
+
+ public void setGtid(String gtid) {
+ this.gtid = gtid;
+ }
+
+ public int hashCode() {
+ int result = 1;
+ result = 31 * result + (this.journalName == null ? 0 : this.journalName.hashCode());
+ result = 31 * result + (this.position == null ? 0 : this.position.hashCode());
+ result = 31 * result + (this.executeTime == null ? 0 : this.executeTime.hashCode());
+ return result;
+ }
+
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ } else if (!(obj instanceof EntryPosition)) {
+ return false;
+ } else {
+ EntryPosition other = (EntryPosition) obj;
+ if (this.journalName == null) {
+ if (other.journalName != null) {
+ return false;
+ }
+ } else if (!this.journalName.equals(other.journalName)) {
+ return false;
+ }
+
+ if (this.position == null) {
+ if (other.position != null) {
+ return false;
+ }
+ } else if (!this.position.equals(other.position)) {
+ return false;
+ }
+
+ if (this.executeTime == null) {
+ if (other.executeTime != null) {
+ return false;
+ }
+ } else if (!this.executeTime.equals(other.executeTime)) {
+ return false;
+ }
+
+ return true;
+ }
+ }
+ @Override
+ public String toString() {
+ return "[" + journalName + ":" + position + "]";
+ }
+
+ public int compareTo(EntryPosition o) {
+ final int val = journalName.compareTo(o.journalName);
+
+ if (val == 0) {
+ return (int) (position - o.position);
+ }
+ return val;
+ }
+
+ public static EntryPosition min(EntryPosition position1, EntryPosition position2) {
+ if (position1.getJournalName().compareTo(position2.getJournalName()) > 0) {
+ return position2;
+ } else if (position1.getJournalName().compareTo(position2.getJournalName()) < 0) {
+ return position1;
+ } else {
+ if (position1.getPosition() > position2.getPosition()) {
+ return position2;
+ } else {
+ return position1;
+ }
+ }
+ }
+
+ // --------helper methods---------
+
+ public static EntryPosition createPosition(CanalEntry.Entry entry) {
+ final CanalEntry.Header header = entry.getHeader();
+ EntryPosition position = new EntryPosition();
+ position.setJournalName(header.getLogfileName());
+ position.setPosition(header.getLogfileOffset());
+ position.setExecuteTime(header.getExecuteTime());
+ position.setGtid(header.getGtid());
+ return position;
+ }
+
+ public static boolean checkPosition(CanalEntry.Entry entry, EntryPosition entryPosition) {
+ return checkPosition(entry.getHeader(), entryPosition);
+ }
+
+ public static boolean checkPosition(CanalEntry.Header header, EntryPosition entryPosition) {
+ boolean result = entryPosition.getExecuteTime().equals(header.getExecuteTime());
+ boolean isEmptyPosition = (Strings.isNullOrEmpty(entryPosition.getJournalName()) && entryPosition.getPosition() == null);
+ if (!isEmptyPosition) {
+ result &= entryPosition.getPosition().equals(header.getLogfileOffset());
+ if (result) {
+ result &= header.getLogfileName().equals(entryPosition.getJournalName());
+ }
+ }
+ return result;
+ }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java
new file mode 100644
index 00000000000000..4d68315c331661
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.position;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+public class PositionMeta {
+ // max batch id
+ private long maxBatchId;
+ // batch Id -> position range
+ private Map> batches;
+ // channel Id -> commit position
+ private Map commitPositions;
+ // ack position
+ private T ackPosition;
+ // ack time
+ private long ackTime;
+
+ public PositionMeta() {
+ this.maxBatchId = -1L;
+ this.batches = Maps.newHashMap();
+ this.commitPositions = Maps.newHashMap();
+ }
+ public void addBatch(long batchId, PositionRange range) {
+ updateMaxBatchId(batchId);
+ batches.put(batchId, range);
+ }
+
+ public PositionRange removeBatch(long batchId) {
+ return batches.remove(batchId);
+ }
+
+ public void clearAllBatch() {
+ batches.clear();
+ }
+
+ public void setCommitPosition(long channelId, T position) {
+ commitPositions.put(channelId, position);
+ }
+
+ public T getCommitPosition(long channelId) {
+ return commitPositions.get(channelId);
+ }
+
+ public void setAckPosition(T ackPosition) {
+ this.ackPosition = ackPosition;
+ }
+
+ public T getAckPosition() {
+ return this.ackPosition;
+ }
+
+ public void setAckTime(long ackTime) {
+ this.ackTime = ackTime;
+ }
+
+ public long getAckTime() {
+ return this.ackTime;
+ }
+
+ public T getLatestPosition() {
+ if (batches.isEmpty()) {
+ return null;
+ } else {
+ return batches.get(maxBatchId).getEnd();
+ }
+ }
+
+ private void updateMaxBatchId(long batchId) {
+ if (maxBatchId < batchId) {
+ maxBatchId = batchId;
+ }
+ }
+
+ public void cleanUp() {
+ this.maxBatchId = -1L;
+ this.batches.clear();
+ this.commitPositions.clear();
+ }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionRange.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionRange.java
new file mode 100644
index 00000000000000..a33fe41db13835
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionRange.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.position;
+
+public class PositionRange {
+ private T start;
+ private T end;
+
+ public PositionRange() {
+ }
+
+ public PositionRange(T start, T end) {
+ this.start = start;
+ this.end = end;
+ }
+
+ public T getStart() {
+ return start;
+ }
+
+ public void setStart(T start) {
+ this.start = start;
+ }
+
+ public T getEnd() {
+ return end;
+ }
+
+ public void setEnd(T end) {
+ this.end = end;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((end == null) ? 0 : end.hashCode());
+ result = prime * result + ((start == null) ? 0 : start.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof PositionRange)) {
+ return false;
+ }
+
+ PositionRange other = (PositionRange) obj;
+ if (end == null) {
+ if (other.end != null) {
+ return false;
+ }
+ } else if (!end.equals(other.end)) {
+ return false;
+ }
+
+ if (start == null) {
+ if (other.start != null) {
+ return false;
+ }
+ } else if (!start.equals(other.start)) {
+ return false;
+ }
+
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 728e89ecd6855d..7fbcd476ab9d83 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -29,11 +29,11 @@
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.EncryptKey;
import org.apache.doris.catalog.EncryptKeyHelper;
+import org.apache.doris.catalog.EncryptKeySearchDesc;
import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.FunctionSearchDesc;
-import org.apache.doris.catalog.EncryptKey;
-import org.apache.doris.catalog.EncryptKeySearchDesc;
import org.apache.doris.catalog.Resource;
import org.apache.doris.cluster.BaseParam;
import org.apache.doris.cluster.Cluster;
@@ -59,6 +59,7 @@
import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.routineload.RoutineLoadJob;
+import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.UserPropertyInfo;
@@ -715,6 +716,16 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) {
catalog.getLoadManager().replayUpdateLoadJobStateInfo(info);
break;
}
+ case OperationType.OP_CREATE_SYNC_JOB: {
+ SyncJob syncJob = (SyncJob) journal.getData();
+ catalog.getSyncJobManager().replayAddSyncJob(syncJob);
+ break;
+ }
+ case OperationType.OP_UPDATE_SYNC_JOB_STATE: {
+ SyncJob.SyncJobUpdateStateInfo info = (SyncJob.SyncJobUpdateStateInfo) journal.getData();
+ catalog.getSyncJobManager().replayUpdateSyncJobState(info);
+ break;
+ }
case OperationType.OP_FETCH_STREAM_LOAD_RECORD: {
FetchStreamLoadRecord fetchStreamLoadRecord = (FetchStreamLoadRecord) journal.getData();
catalog.getStreamLoadRecordMgr().replayFetchStreamLoadRecord(fetchStreamLoadRecord);
@@ -1335,6 +1346,14 @@ public void logUpdateLoadJob(LoadJobStateUpdateInfo info) {
logEdit(OperationType.OP_UPDATE_LOAD_JOB, info);
}
+ public void logCreateSyncJob(SyncJob syncJob) {
+ logEdit(OperationType.OP_CREATE_SYNC_JOB, syncJob);
+ }
+
+ public void logUpdateSyncJobState(SyncJob.SyncJobUpdateStateInfo info) {
+ logEdit(OperationType.OP_UPDATE_SYNC_JOB_STATE, info);
+ }
+
public void logFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRecord) {
logEdit(OperationType.OP_FETCH_STREAM_LOAD_RECORD, fetchStreamLoadRecord);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index f591e5bc32979e..9ac9aa57ed191c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -172,6 +172,10 @@ public class OperationType {
public static final short OP_UPDATE_LOAD_JOB = 232;
// fetch stream load record
public static final short OP_FETCH_STREAM_LOAD_RECORD = 233;
+ // create sync job
+ public static final short OP_CREATE_SYNC_JOB = 234;
+ // update sync job state
+ public static final short OP_UPDATE_SYNC_JOB_STATE = 235;
// small files 251~260
public static final short OP_CREATE_SMALL_FILE = 251;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 56e8ac8a8b0696..300f25f0945a7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -32,7 +32,8 @@
import org.apache.doris.catalog.StructType;
import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo;
-
+import org.apache.doris.load.sync.SyncJob;
+import org.apache.doris.load.sync.canal.CanalSyncJob;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashBasedTable;
@@ -42,16 +43,6 @@
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Table;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import com.google.gson.ExclusionStrategy;
import com.google.gson.FieldAttributes;
import com.google.gson.Gson;
@@ -70,7 +61,17 @@
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
-import org.apache.commons.lang3.reflect.TypeUtils;
+
+import org.apache.commons.lang3.reflect.TypeUtils;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
/*
* Some utilities about Gson.
@@ -116,6 +117,11 @@ public class GsonUtils {
.registerSubtype(RollupJobV2.class, RollupJobV2.class.getSimpleName())
.registerSubtype(SchemaChangeJobV2.class, SchemaChangeJobV2.class.getSimpleName());
+ // runtime adapter for class "SyncJob"
+ private static RuntimeTypeAdapterFactory syncJobTypeAdapterFactory = RuntimeTypeAdapterFactory
+ .of(SyncJob.class, "clazz")
+ .registerSubtype(CanalSyncJob.class, CanalSyncJob.class.getSimpleName());
+
// runtime adapter for class "LoadJobStateUpdateInfo"
private static RuntimeTypeAdapterFactory loadJobStateUpdateInfoTypeAdapterFactory
= RuntimeTypeAdapterFactory
@@ -134,6 +140,7 @@ public class GsonUtils {
.registerTypeAdapterFactory(distributionInfoTypeAdapterFactory)
.registerTypeAdapterFactory(resourceTypeAdapterFactory)
.registerTypeAdapterFactory(alterJobV2TypeAdapterFactory)
+ .registerTypeAdapterFactory(syncJobTypeAdapterFactory)
.registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory)
.registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer())
.registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 1045caf5ad67ab..f9e76ec1691916 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -35,10 +35,11 @@
import org.apache.doris.analysis.CancelBackupStmt;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CreateClusterStmt;
+import org.apache.doris.analysis.CreateDataSyncJobStmt;
import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateEncryptKeyStmt;
import org.apache.doris.analysis.CreateFileStmt;
import org.apache.doris.analysis.CreateFunctionStmt;
-import org.apache.doris.analysis.CreateEncryptKeyStmt;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.CreateRepositoryStmt;
import org.apache.doris.analysis.CreateResourceStmt;
@@ -52,9 +53,9 @@
import org.apache.doris.analysis.DeleteStmt;
import org.apache.doris.analysis.DropClusterStmt;
import org.apache.doris.analysis.DropDbStmt;
+import org.apache.doris.analysis.DropEncryptKeyStmt;
import org.apache.doris.analysis.DropFileStmt;
import org.apache.doris.analysis.DropFunctionStmt;
-import org.apache.doris.analysis.DropEncryptKeyStmt;
import org.apache.doris.analysis.DropMaterializedViewStmt;
import org.apache.doris.analysis.DropRepositoryStmt;
import org.apache.doris.analysis.DropResourceStmt;
@@ -67,14 +68,17 @@
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.MigrateDbStmt;
import org.apache.doris.analysis.PauseRoutineLoadStmt;
+import org.apache.doris.analysis.PauseSyncJobStmt;
import org.apache.doris.analysis.RecoverDbStmt;
import org.apache.doris.analysis.RecoverPartitionStmt;
import org.apache.doris.analysis.RecoverTableStmt;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.ResumeRoutineLoadStmt;
+import org.apache.doris.analysis.ResumeSyncJobStmt;
import org.apache.doris.analysis.RevokeStmt;
import org.apache.doris.analysis.SetUserPropertyStmt;
import org.apache.doris.analysis.StopRoutineLoadStmt;
+import org.apache.doris.analysis.StopSyncJobStmt;
import org.apache.doris.analysis.SyncStmt;
import org.apache.doris.analysis.TruncateTableStmt;
import org.apache.doris.analysis.UninstallPluginStmt;
@@ -84,6 +88,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.sync.SyncJobManager;
public class DdlExecutor {
public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception {
@@ -240,6 +245,21 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception {
catalog.getResourceMgr().createResource((CreateResourceStmt) ddlStmt);
} else if (ddlStmt instanceof DropResourceStmt) {
catalog.getResourceMgr().dropResource((DropResourceStmt) ddlStmt);
+ } else if (ddlStmt instanceof CreateDataSyncJobStmt) {
+ CreateDataSyncJobStmt createSyncJobStmt = (CreateDataSyncJobStmt) ddlStmt;
+ SyncJobManager syncJobMgr = catalog.getSyncJobManager();
+ if (!syncJobMgr.isJobNameExist(createSyncJobStmt.getDbName(), createSyncJobStmt.getJobName())) {
+ syncJobMgr.addDataSyncJob((CreateDataSyncJobStmt) ddlStmt);
+ } else {
+ throw new DdlException("The syncJob with jobName '" + createSyncJobStmt.getJobName() +
+ "' in database [" + createSyncJobStmt.getDbName() + "] is already exists.");
+ }
+ } else if (ddlStmt instanceof ResumeSyncJobStmt) {
+ catalog.getSyncJobManager().resumeSyncJob((ResumeSyncJobStmt) ddlStmt);
+ } else if (ddlStmt instanceof PauseSyncJobStmt) {
+ catalog.getSyncJobManager().pauseSyncJob((PauseSyncJobStmt) ddlStmt);
+ } else if (ddlStmt instanceof StopSyncJobStmt) {
+ catalog.getSyncJobManager().stopSyncJob((StopSyncJobStmt) ddlStmt);
} else {
throw new DdlException("Unknown statement.");
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index ed49e7cfc27f28..54ca69fb7e7ad9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -41,6 +41,7 @@
import org.apache.doris.analysis.ShowDbStmt;
import org.apache.doris.analysis.ShowDeleteStmt;
import org.apache.doris.analysis.ShowDynamicPartitionStmt;
+import org.apache.doris.analysis.ShowEncryptKeysStmt;
import org.apache.doris.analysis.ShowEnginesStmt;
import org.apache.doris.analysis.ShowExportStmt;
import org.apache.doris.analysis.ShowFrontendsStmt;
@@ -69,6 +70,7 @@
import org.apache.doris.analysis.ShowSnapshotStmt;
import org.apache.doris.analysis.ShowStmt;
import org.apache.doris.analysis.ShowStreamLoadStmt;
+import org.apache.doris.analysis.ShowSyncJobStmt;
import org.apache.doris.analysis.ShowTableIdStmt;
import org.apache.doris.analysis.ShowTableStatusStmt;
import org.apache.doris.analysis.ShowTableStmt;
@@ -86,9 +88,9 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DynamicPartitionProperty;
+import org.apache.doris.catalog.EncryptKey;
import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.Index;
-import org.apache.doris.catalog.EncryptKey;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MetadataViewer;
@@ -144,7 +146,6 @@
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TUnit;
import org.apache.doris.transaction.GlobalTransactionMgr;
-
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -302,6 +303,8 @@ public ShowResultSet execute() throws AnalysisException {
handleShowLoadProfile();
} else if (stmt instanceof AdminShowDataSkewStmt) {
handleAdminShowDataSkew();
+ } else if (stmt instanceof ShowSyncJobStmt) {
+ handleShowSyncJobs();
} else {
handleEmtpy();
}
@@ -1729,6 +1732,29 @@ private void handleShowRestore() throws AnalysisException {
resultSet = new ShowResultSet(showStmt.getMetaData(), infos);
}
+ private void handleShowSyncJobs() throws AnalysisException {
+ ShowSyncJobStmt showStmt = (ShowSyncJobStmt) stmt;
+ Catalog catalog = Catalog.getCurrentCatalog();
+ Database db = catalog.getDb(showStmt.getDbName());
+ if (db == null) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName());
+ }
+
+ List> syncInfos = catalog.getSyncJobManager().getSyncJobsInfoByDbId(db.getId());
+ Collections.sort(syncInfos, new ListComparator>(0));
+
+ List> rows = Lists.newArrayList();
+ for (List syncInfo : syncInfos) {
+ List row = new ArrayList(syncInfo.size());
+
+ for (Comparable element : syncInfo) {
+ row.add(element.toString());
+ }
+ rows.add(row);
+ }
+ resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
+ }
+
private void handleShowGrants() {
ShowGrantsStmt showStmt = (ShowGrantsStmt) stmt;
List> infos = Catalog.getCurrentCatalog().getAuth().getAuthInfo(showStmt.getUserIdent());
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex
index 9a5bcc5e662645..f4196225459df9 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -108,6 +108,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("begin", new Integer(SqlParserSymbols.KW_BEGIN));
keywordMap.put("between", new Integer(SqlParserSymbols.KW_BETWEEN));
keywordMap.put("bigint", new Integer(SqlParserSymbols.KW_BIGINT));
+ keywordMap.put("binlog", new Integer(SqlParserSymbols.KW_BINLOG));
keywordMap.put("bitmap", new Integer(SqlParserSymbols.KW_BITMAP));
keywordMap.put("bitmap_union", new Integer(SqlParserSymbols.KW_BITMAP_UNION));
keywordMap.put("boolean", new Integer(SqlParserSymbols.KW_BOOLEAN));
@@ -236,6 +237,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("isnull", new Integer(SqlParserSymbols.KW_ISNULL));
keywordMap.put("isolation", new Integer(SqlParserSymbols.KW_ISOLATION));
keywordMap.put("install", new Integer(SqlParserSymbols.KW_INSTALL));
+ keywordMap.put("job", new Integer(SqlParserSymbols.KW_JOB));
keywordMap.put("join", new Integer(SqlParserSymbols.KW_JOIN));
keywordMap.put("key", new Integer(SqlParserSymbols.KW_KEY));
keywordMap.put("keys", new Integer(SqlParserSymbols.KW_KEYS));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateDataSyncJobStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateDataSyncJobStmtTest.java
new file mode 100644
index 00000000000000..fa029ac739e0d3
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateDataSyncJobStmtTest.java
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.sync.DataSyncJobType;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+
+public class CreateDataSyncJobStmtTest {
+ private static final Logger LOG = LogManager.getLogger(CreateDataSyncJobStmtTest.class);
+
+ private String jobName = "testJob";
+ private String dbName = "testDb";
+ private String tblName = "testTbl";
+ private Map properties;
+
+ @Mocked
+ Catalog catalog;
+ @Mocked
+ Analyzer analyzer;
+ @Mocked
+ PaloAuth auth;
+ @Injectable
+ Database database;
+ @Injectable
+ OlapTable table;
+
+ @Before
+ public void setUp() {
+ properties = Maps.newHashMap();
+ new Expectations() {
+ {
+ catalog.getDb("testCluster:testDb");
+ minTimes = 0;
+ result = database;
+
+ catalog.getAuth();
+ minTimes = 0;
+ result = auth;
+
+ analyzer.getClusterName();
+ minTimes = 0;
+ result = "testCluster";
+
+ auth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any);
+ minTimes = 0;
+ result = true;
+
+ database.getTable("testTbl");
+ minTimes = 0;
+ result = table;
+
+ Catalog.getCurrentCatalog();
+ minTimes = 0;
+ result = catalog;
+ }
+ };
+
+ Config.enable_create_sync_job = true;
+ }
+ @Test
+ public void testNoDb() {
+ CreateDataSyncJobStmt stmt = new CreateDataSyncJobStmt(
+ null, null, null, null, null);
+ try {
+ stmt.analyze(analyzer);
+ Assert.fail();
+ } catch (UserException e) {
+ LOG.info(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testNoType() {
+ BinlogDesc binlogDesc = new BinlogDesc(properties);
+ CreateDataSyncJobStmt stmt = new CreateDataSyncJobStmt(
+ jobName, dbName, null, binlogDesc, null);
+ try {
+ stmt.analyze(analyzer);
+ Assert.fail();
+ } catch (UserException e) {
+ LOG.info(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDuplicateColNames() {
+ properties.put("type", "canal");
+ BinlogDesc binlogDesc = new BinlogDesc(properties);
+ List colNames = Lists.newArrayList();
+ colNames.add("a");
+ colNames.add("a");
+ ChannelDescription channelDescription = new ChannelDescription(
+ "mysql_db", "mysql_tbl", tblName, null, colNames);
+ CreateDataSyncJobStmt stmt = new CreateDataSyncJobStmt(
+ jobName, dbName, Lists.newArrayList(channelDescription), binlogDesc, null);
+ try {
+ stmt.analyze(analyzer);
+ Assert.fail();
+ } catch (UserException e) {
+ LOG.info(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testNoUniqueTable() {
+ properties.put("type", "canal");
+ BinlogDesc binlogDesc = new BinlogDesc(properties);
+ ChannelDescription channelDescription = new ChannelDescription(
+ "mysql_db", "mysql_tbl", tblName, null, null);
+ CreateDataSyncJobStmt stmt = new CreateDataSyncJobStmt(
+ jobName, dbName, Lists.newArrayList(channelDescription), binlogDesc, null);
+ try {
+ stmt.analyze(analyzer);
+ Assert.fail();
+ } catch (UserException e) {
+ LOG.info(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testNormal() {
+ new Expectations() {
+ {
+ table.getKeysType();
+ result = KeysType.UNIQUE_KEYS;
+ table.hasDeleteSign();
+ result = true;
+ }
+ };
+ properties.put("type", "canal");
+ BinlogDesc binlogDesc = new BinlogDesc(properties);
+ ChannelDescription channelDescription = new ChannelDescription(
+ "mysql_db", "mysql_tbl", tblName, null, null);
+ CreateDataSyncJobStmt stmt = new CreateDataSyncJobStmt(
+ jobName, dbName, Lists.newArrayList(channelDescription), binlogDesc, null);
+ try {
+ stmt.analyze(analyzer);
+ Assert.assertEquals(jobName, stmt.getJobName());
+ Assert.assertEquals("testCluster:testDb", stmt.getDbName());
+ Assert.assertEquals(DataSyncJobType.CANAL, stmt.getDataSyncJobType());
+ } catch (UserException e) {
+ }
+ }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
new file mode 100644
index 00000000000000..232b414ec91a98
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
@@ -0,0 +1,369 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.doris.analysis.CreateDataSyncJobStmt;
+import org.apache.doris.analysis.PauseSyncJobStmt;
+import org.apache.doris.analysis.ResumeSyncJobStmt;
+import org.apache.doris.analysis.StopSyncJobStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.load.sync.SyncFailMsg.MsgType;
+import org.apache.doris.load.sync.SyncJob.JobState;
+import org.apache.doris.load.sync.SyncJob.SyncJobUpdateStateInfo;
+import org.apache.doris.load.sync.canal.CanalSyncJob;
+import org.apache.doris.load.sync.canal.SyncCanalClient;
+import org.apache.doris.persist.EditLog;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+
+public class SyncJobManagerTest {
+ private static final Logger LOG = LogManager.getLogger(SyncJobManagerTest.class);
+
+ private long jobId = 10000L;
+ private String jobName = "testJob";
+ private long dbId = 50000L;
+
+ @Mocked
+ EditLog editLog;
+ @Mocked
+ Catalog catalog;
+ @Mocked
+ Database database;
+ @Mocked
+ SyncCanalClient client;
+
+ @Before
+ public void setUp() throws DdlException {
+ new Expectations() {
+ {
+ catalog.getEditLog();
+ minTimes = 0;
+ result = editLog;
+ catalog.getDb(anyString);
+ minTimes = 0;
+ result = database;
+ database.getId();
+ minTimes = 0;
+ result = dbId;
+ Catalog.getCurrentCatalog();
+ result = catalog;
+ }
+ };
+ }
+
+ @Test
+ public void testAddSyncJob(@Injectable CreateDataSyncJobStmt stmt,
+ @Mocked SyncJob syncJob) throws DdlException {
+ CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+ new Expectations() {
+ {
+ SyncJob.fromStmt(anyLong, (CreateDataSyncJobStmt) any);
+ result = canalSyncJob;
+ }
+ };
+
+ SyncJobManager manager = new SyncJobManager();
+ manager.addDataSyncJob(stmt);
+
+ Map idToSyncJobs = Deencapsulation.getField(manager, "idToSyncJob");
+ Assert.assertEquals(1, idToSyncJobs.size());
+ SyncJob syncJob1 = idToSyncJobs.values().iterator().next();
+ Assert.assertEquals(10000L, syncJob1.getId());
+ Assert.assertEquals("testJob", syncJob1.getJobName());
+ Assert.assertEquals(50000L, syncJob1.getDbId());
+ Assert.assertEquals(JobState.PENDING, syncJob1.getJobState());
+ Assert.assertEquals(DataSyncJobType.CANAL, syncJob1.getJobType());
+ Assert.assertTrue(syncJob1 instanceof CanalSyncJob);
+
+ Map>> dbIdToJobNameToSyncJobs =
+ Deencapsulation.getField(manager, "dbIdToJobNameToSyncJobs");
+ Assert.assertEquals(1, dbIdToJobNameToSyncJobs.size());
+ Map> jobNameToSyncJobs = dbIdToJobNameToSyncJobs.values().iterator().next();
+ Assert.assertEquals(1, jobNameToSyncJobs.size());
+ Assert.assertTrue(jobNameToSyncJobs.containsKey("testJob"));
+ List syncJobs = jobNameToSyncJobs.get("testJob");
+ Assert.assertEquals(1, syncJobs.size());
+ SyncJob syncJob2 = syncJobs.get(0);
+ Assert.assertEquals(syncJob1, syncJob2);
+ }
+
+ @Test
+ public void testPauseSyncJob(@Injectable PauseSyncJobStmt stmt) {
+ CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+ new Expectations() {
+ {
+ stmt.getJobName();
+ result = "testJob";
+
+ stmt.getDbFullName();
+ result = "testDb";
+ }
+ };
+
+ SyncJobManager manager = new SyncJobManager();
+ try {
+ manager.pauseSyncJob(stmt);
+ Assert.fail();
+ } catch (DdlException e) {
+ LOG.info(e.getMessage());
+ }
+
+ // add a sync job to manager
+ Map>> dbIdToJobNameToSyncJobs = Maps.newHashMap();
+ Map> jobNameToSyncJobs = Maps.newHashMap();
+ jobNameToSyncJobs.put("testJob", Lists.newArrayList(canalSyncJob));
+ dbIdToJobNameToSyncJobs.put(50000L, jobNameToSyncJobs);
+ Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs", dbIdToJobNameToSyncJobs);
+
+ // a new sync job state is pending
+ Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+ try {
+ manager.pauseSyncJob(stmt);
+ Assert.fail();
+ } catch (DdlException e) {
+ LOG.info(e.getMessage());
+ }
+
+ // change sync job state to paused
+ canalSyncJob.updateState(JobState.PAUSED, false);
+ Assert.assertEquals(JobState.PAUSED, canalSyncJob.getJobState());
+ try {
+ manager.pauseSyncJob(stmt);
+ Assert.fail();
+ } catch (DdlException e) {
+ LOG.info(e.getMessage());
+ }
+
+ // change sync job state to cancelled
+ canalSyncJob.updateState(JobState.CANCELLED, false);
+ Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+ try {
+ manager.pauseSyncJob(stmt);
+ Assert.fail();
+ } catch (DdlException e) {
+ LOG.info(e.getMessage());
+ }
+
+ // change sync job state to running
+ canalSyncJob.updateState(JobState.RUNNING, false);
+ Assert.assertEquals(JobState.RUNNING, canalSyncJob.getJobState());
+ try {
+ manager.pauseSyncJob(stmt);
+ Assert.assertEquals(JobState.PAUSED, canalSyncJob.getJobState());
+ } catch (DdlException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testResumeSyncJob(@Injectable ResumeSyncJobStmt stmt) {
+ CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+ new Expectations() {
+ {
+ stmt.getJobName();
+ result = "testJob";
+
+ stmt.getDbFullName();
+ result = "testDb";
+ }
+ };
+
+ Deencapsulation.setField(canalSyncJob, "client", client);
+
+ SyncJobManager manager = new SyncJobManager();
+ try {
+ manager.resumeSyncJob(stmt);
+ Assert.fail();
+ } catch (DdlException e) {
+ LOG.info(e.getMessage());
+ }
+
+ // add a sync job to manager
+ Map>> dbIdToJobNameToSyncJobs = Maps.newHashMap();
+ Map> jobNameToSyncJobs = Maps.newHashMap();
+ jobNameToSyncJobs.put("testJob", Lists.newArrayList(canalSyncJob));
+ dbIdToJobNameToSyncJobs.put(50000L, jobNameToSyncJobs);
+ Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs", dbIdToJobNameToSyncJobs);
+
+ // a new sync job state is pending
+ Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+ try {
+ manager.resumeSyncJob(stmt);
+ Assert.fail();
+ } catch (DdlException e) {
+ LOG.info(e.getMessage());
+ }
+
+ // change sync job state to running
+ canalSyncJob.updateState(JobState.RUNNING, false);
+ Assert.assertEquals(JobState.RUNNING, canalSyncJob.getJobState());
+ try {
+ manager.resumeSyncJob(stmt);
+ Assert.fail();
+ } catch (DdlException e) {
+ LOG.info(e.getMessage());
+ }
+
+ // change sync job state to cancelled
+ canalSyncJob.updateState(JobState.CANCELLED, false);
+ Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+ try {
+ manager.resumeSyncJob(stmt);
+ Assert.fail();
+ } catch (DdlException e) {
+ LOG.info(e.getMessage());
+ }
+
+ // change sync job state to paused
+ canalSyncJob.updateState(JobState.PAUSED, false);
+ Assert.assertEquals(JobState.PAUSED, canalSyncJob.getJobState());
+ try {
+ manager.resumeSyncJob(stmt);
+ Assert.assertEquals(JobState.RUNNING, canalSyncJob.getJobState());
+ } catch (DdlException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testStopSyncJob(@Injectable StopSyncJobStmt stmt) {
+ CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+ new Expectations() {
+ {
+ stmt.getJobName();
+ result = "testJob";
+
+ stmt.getDbFullName();
+ result = "testDb";
+ }
+ };
+
+ Deencapsulation.setField(canalSyncJob, "client", client);
+
+ SyncJobManager manager = new SyncJobManager();
+ try {
+ manager.stopSyncJob(stmt);
+ Assert.fail();
+ } catch (DdlException e) {
+ LOG.info(e.getMessage());
+ }
+
+ // add a sync job to manager
+ Map>> dbIdToJobNameToSyncJobs = Maps.newHashMap();
+ Map> jobNameToSyncJobs = Maps.newHashMap();
+ jobNameToSyncJobs.put("testJob", Lists.newArrayList(canalSyncJob));
+ dbIdToJobNameToSyncJobs.put(50000L, jobNameToSyncJobs);
+ Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs", dbIdToJobNameToSyncJobs);
+
+ // a new sync job state is pending
+ Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+ try {
+ manager.stopSyncJob(stmt);
+ Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+ } catch (DdlException e) {
+ LOG.info(e.getMessage());
+ }
+
+ // change sync job state to paused
+ canalSyncJob.updateState(JobState.PAUSED, false);
+ Assert.assertEquals(JobState.PAUSED, canalSyncJob.getJobState());
+ try {
+ manager.stopSyncJob(stmt);
+ Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+ } catch (DdlException e) {
+ LOG.info(e.getMessage());
+ }
+
+ // change sync job state to running
+ canalSyncJob.updateState(JobState.RUNNING, false);
+ Assert.assertEquals(JobState.RUNNING, canalSyncJob.getJobState());
+ try {
+ manager.stopSyncJob(stmt);
+ Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+ } catch (DdlException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ // change sync job state to cancelled
+ canalSyncJob.updateState(JobState.CANCELLED, false);
+ Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+ try {
+ manager.stopSyncJob(stmt);
+ Assert.fail();
+ } catch (DdlException e) {
+ LOG.info(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testJobNameExist() throws DdlException {
+ CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+ SyncJobManager manager = new SyncJobManager();
+ Assert.assertFalse(manager.isJobNameExist("testDb", "testJob"));
+
+ // add a sync job to manager
+ Map>> dbIdToJobNameToSyncJobs = Maps.newHashMap();
+ Map> jobNameToSyncJobs = Maps.newHashMap();
+ jobNameToSyncJobs.put("testJob", Lists.newArrayList(canalSyncJob));
+ dbIdToJobNameToSyncJobs.put(50000L, jobNameToSyncJobs);
+ Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs", dbIdToJobNameToSyncJobs);
+ Assert.assertTrue(manager.isJobNameExist("testDb", "testJob"));
+ }
+ @Test
+ public void testReplayUpdateSyncJobState() {
+ CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+ // change sync job state to running
+ canalSyncJob.updateState(JobState.RUNNING, false);
+ Assert.assertEquals(JobState.RUNNING, canalSyncJob.getJobState());
+
+ Deencapsulation.setField(canalSyncJob, "client", client);
+ Deencapsulation.setField(canalSyncJob, "channels", Lists.newArrayList());
+
+ SyncJobUpdateStateInfo info = new SyncJobUpdateStateInfo(jobId,
+ JobState.CANCELLED, -1L, -1L, -1L,
+ new SyncFailMsg(MsgType.USER_CANCEL, "user cancel"));
+ SyncJobManager manager = new SyncJobManager();
+
+ // add a sync job to manager
+ Map idToSyncJob = Maps.newHashMap();
+ idToSyncJob.put(jobId, canalSyncJob);
+ Deencapsulation.setField(manager, "idToSyncJob", idToSyncJob);
+ manager.replayUpdateSyncJobState(info);
+ Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+ Assert.assertEquals(MsgType.USER_CANCEL, canalSyncJob.getFailMsg().getMsgType());
+ }
+
+
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobTest.java
new file mode 100644
index 00000000000000..c35ee682846e34
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobTest.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.load.sync.SyncFailMsg.MsgType;
+import org.apache.doris.load.sync.SyncJob.JobState;
+import org.apache.doris.load.sync.SyncJob.SyncJobUpdateStateInfo;
+import org.apache.doris.load.sync.canal.CanalSyncJob;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+public class SyncJobTest {
+ private long jobId;
+ private long dbId;
+ private String jobName;
+
+ @Before
+ public void setUp() {
+ jobId = 1L;
+ dbId = 1L;
+ jobName = "test_job";
+ }
+
+ @Test
+ public void testUpdateStateToRunning() {
+ SyncJob syncJob = new CanalSyncJob(jobId, jobName, dbId);
+ syncJob.updateState(JobState.RUNNING, true);
+ Assert.assertEquals(JobState.RUNNING, syncJob.getJobState());
+ Assert.assertNotEquals(-1L, (long) Deencapsulation.getField(syncJob, "lastStartTimeMs"));
+ }
+
+ @Test
+ public void testUpdateStateInfoPersist() throws IOException {
+ String fileName = "./testSyncJobUpdateStateInfoPersistFile";
+ File file = new File(fileName);
+ if (file.exists()) {
+ file.delete();
+ }
+ file.createNewFile();
+
+ JobState jobState = JobState.CANCELLED;
+ SyncFailMsg failMsg = new SyncFailMsg(MsgType.USER_CANCEL, "user cancel");
+ long lastStartTimeMs = 1621914540L;
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
+ SyncJobUpdateStateInfo info = new SyncJobUpdateStateInfo(
+ jobId, jobState, lastStartTimeMs, -1L, -1L, failMsg);
+ info.write(out);
+ out.flush();
+ out.close();
+
+ DataInputStream in = new DataInputStream(new FileInputStream(file));
+ SyncJobUpdateStateInfo replayedInfo = SyncJobUpdateStateInfo.read(in);
+ Assert.assertEquals(jobId, replayedInfo.getId());
+ Assert.assertEquals(jobState, replayedInfo.getJobState());
+ Assert.assertEquals(lastStartTimeMs, replayedInfo.getLastStartTimeMs());
+ Assert.assertEquals(-1L, replayedInfo.getLastStopTimeMs());
+ Assert.assertEquals(-1L, replayedInfo.getFinishTimeMs());
+ Assert.assertEquals(failMsg, replayedInfo.getFailMsg());
+ in.close();
+
+ // delete file
+ if (file.exists()) {
+ file.delete();
+ }
+ }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
new file mode 100644
index 00000000000000..a47c9fc924fd0d
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -0,0 +1,465 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.canal;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.planner.StreamLoadPlanner;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Status;
+import org.apache.doris.proto.Types;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.task.StreamLoadTask;
+import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanFragmentExecParams;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TransactionState;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalConnectors;
+import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
+public class CanalSyncDataTest {
+ private static final Logger LOG = LogManager.getLogger(CanalSyncDataTest.class);
+
+ private String binlogFile = "mysql-bin.000001";
+ private long offset = 0;
+ private long nextId = 1000L;
+ private int batchSize = 8192;
+
+ ReentrantLock getLock;
+
+ CanalConnector connector;
+
+ @Mocked
+ CanalSyncJob syncJob;
+ @Mocked
+ Database database;
+ @Mocked
+ OlapTable table;
+ @Mocked
+ Catalog catalog;
+ @Mocked
+ Backend backend;
+ @Mocked
+ StreamLoadTask streamLoadTask;
+ @Mocked
+ StreamLoadPlanner streamLoadPlanner;
+ @Mocked
+ SystemInfoService systemInfoService;
+
+ InternalService.PExecPlanFragmentResult beginOkResult = InternalService.PExecPlanFragmentResult.newBuilder()
+ .setStatus(Status.PStatus.newBuilder().setStatusCode(0).build()).build(); // begin txn OK
+
+ InternalService.PExecPlanFragmentResult beginFailResult = InternalService.PExecPlanFragmentResult.newBuilder()
+ .setStatus(Status.PStatus.newBuilder().setStatusCode(1).build()).build(); // begin txn CANCELLED
+
+ InternalService.PCommitResult commitOkResult = InternalService.PCommitResult.newBuilder()
+ .setStatus(Status.PStatus.newBuilder().setStatusCode(0).build()).build(); // commit txn OK
+
+ InternalService.PCommitResult commitFailResult = InternalService.PCommitResult.newBuilder()
+ .setStatus(Status.PStatus.newBuilder().setStatusCode(1).build()).build(); // commit txn CANCELLED
+
+ InternalService.PRollbackResult abortOKResult = InternalService.PRollbackResult.newBuilder()
+ .setStatus(Status.PStatus.newBuilder().setStatusCode(0).build()).build(); // abort txn OK
+
+ InternalService.PSendDataResult sendDataOKResult = InternalService.PSendDataResult.newBuilder()
+ .setStatus(Status.PStatus.newBuilder().setStatusCode(0).build()).build(); // send data OK
+
+ @Before
+ public void setUp() throws Exception {
+
+ List backendIds = Lists.newArrayList(104L);
+ Map map = Maps.newHashMap();
+ map.put(104L, backend);
+ ImmutableMap backendMap = ImmutableMap.copyOf(map);
+ TExecPlanFragmentParams execPlanFragmentParams = new TExecPlanFragmentParams().setParams(new TPlanFragmentExecParams()
+ .setFragmentInstanceId(new TUniqueId())
+ .setPerNodeScanRanges(Maps.newHashMap()));
+
+ new Expectations() {
+ {
+ catalog.getNextId();
+ minTimes = 0;
+ result = 101L;
+
+ syncJob.getId();
+ minTimes = 0;
+ result = 100L;
+
+ database.getId();
+ minTimes = 0;
+ result = 102L;
+
+ table.getId();
+ minTimes = 0;
+ result = 103L;
+
+ table.getName();
+ minTimes = 0;
+ result = "testTbl";
+
+ streamLoadPlanner.plan((TUniqueId) any);
+ minTimes = 0;
+ result = execPlanFragmentParams;
+
+ systemInfoService.seqChooseBackendIds(anyInt, anyBoolean, anyBoolean, anyString);
+ minTimes = 0;
+ result = backendIds;
+
+ systemInfoService.getIdToBackend();
+ minTimes = 0;
+ result = backendMap;
+
+ Catalog.getCurrentCatalog();
+ minTimes = 0;
+ result = catalog;
+
+ Catalog.getCurrentSystemInfo();
+ minTimes = 0;
+ result = systemInfoService;
+ }
+ };
+
+ connector = CanalConnectors.newSingleConnector(
+ new InetSocketAddress("127.0.0.1", 11111), "test", "user", "passwd");
+
+ new MockUp() {
+ @Mock
+ void connect() throws CanalClientException {
+ }
+ @Mock
+ void disconnect() throws CanalClientException {
+ }
+ @Mock
+ Message getWithoutAck(int var1, Long var2, TimeUnit var3) throws CanalClientException {
+ offset += batchSize * 1; // Simply set one entry as one byte
+ return CanalTestUtil.fetchMessage(
+ ++nextId, false, batchSize, binlogFile, offset, "mysql_db", "mysql_tbl");
+ }
+ @Mock
+ void rollback() throws CanalClientException {
+ }
+ @Mock
+ void ack(long var1) throws CanalClientException {
+ }
+ @Mock
+ void subscribe(String var1) throws CanalClientException {
+ }
+ };
+
+ getLock = new ReentrantLock();
+ }
+
+ @Test
+ public void testBeginTxnFail(@Mocked GlobalTransactionMgr transactionMgr) throws Exception {
+
+ new Expectations() {
+ {
+ transactionMgr.beginTransaction(anyLong, (List) any, anyString,
+ (TransactionState.TxnCoordinator) any, (TransactionState.LoadJobSourceType) any, anyLong);
+ minTimes = 0;
+ result = new AnalysisException("test exception");
+
+ Catalog.getCurrentGlobalTransactionMgr();
+ minTimes = 0;
+ result = transactionMgr;
+ }
+ };
+
+ CanalSyncDataConsumer consumer = new CanalSyncDataConsumer(
+ syncJob, connector, getLock, false);
+ CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
+ syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
+ CanalSyncChannel channel = new CanalSyncChannel(
+ syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
+
+ Map idToChannels = Maps.newHashMap();
+ idToChannels.put(channel.getId(), channel);
+ consumer.setChannels(idToChannels);
+
+ channel.start();
+ consumer.start();
+ receiver.start();
+
+ try {
+ Thread.sleep(3000L);
+ } finally {
+ receiver.stop();
+ consumer.stop();
+ channel.stop();
+ }
+
+ Assert.assertEquals("position:N/A", consumer.getPositionInfo());
+ LOG.info(consumer.getPositionInfo());
+ }
+
+ @Test
+ public void testNormal(@Mocked GlobalTransactionMgr transactionMgr,
+ @Mocked BackendServiceProxy backendServiceProxy,
+ @Mocked Future execFuture,
+ @Mocked Future commitFuture,
+ @Mocked Future sendDataFuture) throws Exception {
+
+ new Expectations() {
+ {
+ transactionMgr.beginTransaction(anyLong, (List) any, anyString,
+ (TransactionState.TxnCoordinator) any, (TransactionState.LoadJobSourceType) any, anyLong);
+ minTimes = 0;
+ result = 105L;
+
+ backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any);
+ minTimes = 0;
+ result = execFuture;
+
+ backendServiceProxy.commit((TNetworkAddress) any, (Types.PUniqueId) any);
+ minTimes = 0;
+ result = commitFuture;
+
+ backendServiceProxy.sendData((TNetworkAddress) any, (Types.PUniqueId) any, (List) any);
+ minTimes = 0;
+ result = sendDataFuture;
+
+ execFuture.get(anyLong, (TimeUnit) any);
+ minTimes = 0;
+ result = beginOkResult;
+
+ commitFuture.get(anyLong, (TimeUnit) any);
+ minTimes = 0;
+ result = commitOkResult;
+
+ sendDataFuture.get(anyLong, (TimeUnit) any);
+ minTimes = 0;
+ result = sendDataOKResult;
+
+ Catalog.getCurrentGlobalTransactionMgr();
+ minTimes = 0;
+ result = transactionMgr;
+
+ BackendServiceProxy.getInstance();
+ minTimes = 0;
+ result = backendServiceProxy;
+ }
+ };
+
+ CanalSyncDataConsumer consumer = new CanalSyncDataConsumer(
+ syncJob, connector, getLock, false);
+ CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
+ syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
+ CanalSyncChannel channel = new CanalSyncChannel(
+ syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
+
+ Map idToChannels = Maps.newHashMap();
+ idToChannels.put(channel.getId(), channel);
+ consumer.setChannels(idToChannels);
+
+ channel.start();
+ consumer.start();
+ receiver.start();
+
+ try {
+ Thread.sleep(Config.sync_commit_interval_second * 1000);
+ } finally {
+ receiver.stop();
+ consumer.stop();
+ channel.stop();
+ }
+
+ LOG.info(consumer.getPositionInfo());
+ }
+
+ @Test
+ public void testExecFragmentFail(@Mocked GlobalTransactionMgr transactionMgr,
+ @Mocked BackendServiceProxy backendServiceProxy,
+ @Mocked Future execFuture,
+ @Mocked Future abortFuture) throws Exception {
+
+ new Expectations() {
+ {
+ transactionMgr.beginTransaction(anyLong, (List) any, anyString,
+ (TransactionState.TxnCoordinator) any, (TransactionState.LoadJobSourceType) any, anyLong);
+ minTimes = 0;
+ result = 105L;
+
+ backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any);
+ minTimes = 0;
+ result = execFuture;
+
+ backendServiceProxy.rollback((TNetworkAddress) any, (Types.PUniqueId) any);
+ minTimes = 0;
+ result = abortFuture;
+
+ execFuture.get(anyLong, (TimeUnit) any);
+ minTimes = 0;
+ result = beginFailResult;
+
+ abortFuture.get(anyLong, (TimeUnit) any);
+ minTimes = 0;
+ result = abortOKResult;
+
+ Catalog.getCurrentGlobalTransactionMgr();
+ minTimes = 0;
+ result = transactionMgr;
+
+ BackendServiceProxy.getInstance();
+ minTimes = 0;
+ result = backendServiceProxy;
+ }
+ };
+
+ CanalSyncDataConsumer consumer = new CanalSyncDataConsumer(
+ syncJob, connector, getLock, false);
+ CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
+ syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
+ CanalSyncChannel channel = new CanalSyncChannel(
+ syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
+
+ Map idToChannels = Maps.newHashMap();
+ idToChannels.put(channel.getId(), channel);
+ consumer.setChannels(idToChannels);
+
+ channel.start();
+ consumer.start();
+ receiver.start();
+
+ try {
+ Thread.sleep(3000L);
+ } finally {
+ receiver.stop();
+ consumer.stop();
+ channel.stop();
+ }
+
+ Assert.assertEquals("position:N/A", consumer.getPositionInfo());
+ LOG.info(consumer.getPositionInfo());
+ }
+
+ @Test
+ public void testCommitTxnFail(@Mocked GlobalTransactionMgr transactionMgr,
+ @Mocked BackendServiceProxy backendServiceProxy,
+ @Mocked Future execFuture,
+ @Mocked Future commitFuture,
+ @Mocked Future abortFuture,
+ @Mocked Future sendDataFuture) throws Exception {
+
+ new Expectations() {
+ {
+ transactionMgr.beginTransaction(anyLong, (List) any, anyString,
+ (TransactionState.TxnCoordinator) any, (TransactionState.LoadJobSourceType) any, anyLong);
+ minTimes = 0;
+ result = 105L;
+
+ backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any);
+ minTimes = 0;
+ result = execFuture;
+
+ backendServiceProxy.commit((TNetworkAddress) any, (Types.PUniqueId) any);
+ minTimes = 0;
+ result = commitFuture;
+
+ backendServiceProxy.rollback((TNetworkAddress) any, (Types.PUniqueId) any);
+ minTimes = 0;
+ result = abortFuture;
+
+ backendServiceProxy.sendData((TNetworkAddress) any, (Types.PUniqueId) any, (List) any);
+ minTimes = 0;
+ result = sendDataFuture;
+
+ execFuture.get(anyLong, (TimeUnit) any);
+ minTimes = 0;
+ result = beginOkResult;
+
+ commitFuture.get(anyLong, (TimeUnit) any);
+ minTimes = 0;
+ result = commitFailResult;
+
+ abortFuture.get(anyLong, (TimeUnit) any);
+ minTimes = 0;
+ result = abortOKResult;
+
+ sendDataFuture.get(anyLong, (TimeUnit) any);
+ minTimes = 0;
+ result = sendDataOKResult;
+
+ Catalog.getCurrentGlobalTransactionMgr();
+ minTimes = 0;
+ result = transactionMgr;
+
+ BackendServiceProxy.getInstance();
+ minTimes = 0;
+ result = backendServiceProxy;
+ }
+ };
+
+ CanalSyncDataConsumer consumer = new CanalSyncDataConsumer(
+ syncJob, connector, getLock, false);
+ CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
+ syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
+ CanalSyncChannel channel = new CanalSyncChannel(
+ syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
+
+ Map idToChannels = Maps.newHashMap();
+ idToChannels.put(channel.getId(), channel);
+ consumer.setChannels(idToChannels);
+
+ channel.start();
+ consumer.start();
+ receiver.start();
+
+ try {
+ Thread.sleep(3000L);
+ } finally {
+ receiver.stop();
+ consumer.stop();
+ channel.stop();
+ }
+
+ Assert.assertEquals("position:N/A", consumer.getPositionInfo());
+ LOG.info(consumer.getPositionInfo());
+ }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncJobTest.java
new file mode 100644
index 00000000000000..4d4a8d1485a84b
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncJobTest.java
@@ -0,0 +1,416 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.canal;
+
+import org.apache.doris.analysis.BinlogDesc;
+import org.apache.doris.analysis.ChannelDescription;
+import org.apache.doris.analysis.CreateDataSyncJobStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.load.sync.DataSyncJobType;
+import org.apache.doris.load.sync.SyncChannel;
+import org.apache.doris.load.sync.SyncFailMsg;
+import org.apache.doris.load.sync.SyncFailMsg.MsgType;
+import org.apache.doris.load.sync.SyncJob;
+import org.apache.doris.load.sync.SyncJob.JobState;
+import org.apache.doris.load.sync.SyncJob.SyncJobUpdateStateInfo;
+import org.apache.doris.persist.EditLog;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
+public class CanalSyncJobTest {
+ private static final Logger LOG = LogManager.getLogger(CanalSyncJobTest.class);
+
+ private long jobId;
+ private long dbId;
+ private String dbName;
+ private String tblName;
+ private String jobName;
+ private Catalog catalog;
+ private Map properties;
+
+ @Mocked
+ EditLog editLog;
+
+ @Injectable
+ Database database;
+
+ @Injectable
+ OlapTable table;
+
+ @Before
+ public void setUp() {
+ jobId = 1L;
+ dbId = 10000L;
+ dbName = "testDb";
+ tblName = "testTbl";
+ jobName = "testJob";
+ properties = Maps.newHashMap();
+ properties.put(CanalSyncJob.CANAL_SERVER_IP, "127.0.0.1");
+ properties.put(CanalSyncJob.CANAL_SERVER_PORT, "11111");
+ properties.put(CanalSyncJob.CANAL_DESTINATION, "test");
+ properties.put(CanalSyncJob.CANAL_USERNAME, "test_user");
+ properties.put(CanalSyncJob.CANAL_PASSWORD, "test_password");
+
+ catalog = Deencapsulation.newInstance(Catalog.class);
+ new Expectations(catalog) {
+ {
+ catalog.getDb(10000L);
+ minTimes = 0;
+ result = database;
+
+ catalog.getDb("testDb");
+ minTimes = 0;
+ result = database;
+
+ catalog.getEditLog();
+ minTimes = 0;
+ result = editLog;
+
+ Catalog.getCurrentCatalog();
+ minTimes = 0;
+ result = catalog;
+ }
+ };
+
+ new Expectations(database) {
+ {
+ database.getId();
+ minTimes = 0;
+ result = dbId;
+
+ database.getTable("testTbl");
+ minTimes = 0;
+ result = table;
+ }
+ };
+
+ new Expectations(table) {
+ {
+ table.getName();
+ minTimes = 0;
+ result = tblName;
+
+ table.getKeysType();
+ minTimes = 0;
+ result = KeysType.UNIQUE_KEYS;
+
+ table.hasDeleteSign();
+ minTimes = 0;
+ result = true;
+ }
+ };
+
+ new MockUp() {
+ @Mock
+ public void startup() {
+ }
+ @Mock
+ public void shutdown(boolean needCleanUp) {
+ }
+ @Mock
+ public void registerChannels(List channels) {
+ }
+ };
+ }
+
+ @Test
+ public void testCreateFromStmtWithNoDatabase(@Injectable CreateDataSyncJobStmt stmt) {
+ new Expectations() {
+ {
+ stmt.getDbName();
+ result = "";
+ }
+ };
+
+ try {
+ SyncJob.fromStmt(jobId, stmt);
+ Assert.fail();
+ } catch (DdlException e) {
+ LOG.info(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCreateFromStmtWithoutBinlog(@Injectable CreateDataSyncJobStmt stmt,
+ @Injectable ChannelDescription channelDescription,
+ @Injectable BinlogDesc binlogDesc) {
+ List channelDescriptions = Lists.newArrayList();
+ channelDescriptions.add(channelDescription);
+ new Expectations() {
+ {
+ stmt.getJobName();
+ result = jobName;
+
+ stmt.getDbName();
+ result = dbName;
+
+ stmt.getDataSyncJobType();
+ result = DataSyncJobType.CANAL;
+
+ stmt.getBinlogDesc();
+ result = binlogDesc;
+
+ stmt.getChannelDescriptions();
+ result = channelDescriptions;
+
+ binlogDesc.getProperties();
+ result = Maps.newHashMap();
+ }
+ };
+
+ try {
+ SyncJob.fromStmt(jobId, stmt);
+ Assert.fail();
+ } catch (DdlException e) {
+ LOG.info(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCreateFromStmt(@Injectable CreateDataSyncJobStmt stmt,
+ @Injectable ChannelDescription channelDescription,
+ @Injectable BinlogDesc binlogDesc) {
+
+ List channelDescriptions = Lists.newArrayList();
+ channelDescriptions.add(channelDescription);
+
+ new Expectations() {
+ {
+ stmt.getJobName();
+ result = jobName;
+
+ stmt.getDbName();
+ result = dbName;
+
+ stmt.getDataSyncJobType();
+ result = DataSyncJobType.CANAL;
+
+ stmt.getBinlogDesc();
+ result = binlogDesc;
+
+ stmt.getChannelDescriptions();
+ result = channelDescriptions;
+
+ binlogDesc.getProperties();
+ result = properties;
+ }
+ };
+
+ try {
+ SyncJob syncJob = SyncJob.fromStmt(jobId, stmt);
+ CanalSyncJob canalSyncJob = (CanalSyncJob) syncJob;
+ Assert.assertEquals(jobId, canalSyncJob.getId());
+ Assert.assertEquals(jobName, canalSyncJob.getJobName());
+ Assert.assertEquals(dbId, canalSyncJob.getDbId());
+ Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+ Assert.assertEquals(DataSyncJobType.CANAL, canalSyncJob.getJobType());
+ Assert.assertNull(canalSyncJob.getFailMsg());
+
+ } catch (DdlException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testExecute(@Injectable ChannelDescription channelDescription,
+ @Injectable BinlogDesc binlogDesc) {
+ List channelDescriptions = Lists.newArrayList();
+ channelDescriptions.add(channelDescription);
+
+ new Expectations() {
+ {
+ binlogDesc.getProperties();
+ result = properties;
+
+ channelDescription.getTargetTable();
+ result = tblName;
+
+ channelDescription.getSrcDatabase();
+ result = "mysqlDb";
+
+ channelDescription.getSrcTableName();
+ result = "mysqlTbl";
+
+ channelDescription.getColNames();
+ result = null;
+
+ channelDescription.getPartitionNames();
+ result = null;
+ }
+ };
+
+ try {
+ CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+ canalSyncJob.setChannelDescriptions(channelDescriptions);
+ canalSyncJob.checkAndSetBinlogInfo(binlogDesc);
+ Assert.assertEquals(jobId, canalSyncJob.getId());
+ Assert.assertEquals(jobName, canalSyncJob.getJobName());
+ Assert.assertEquals(dbId, canalSyncJob.getDbId());
+ Assert.assertEquals(DataSyncJobType.CANAL, canalSyncJob.getJobType());
+ Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+ Assert.assertNull(canalSyncJob.getFailMsg());
+ // execute job
+ canalSyncJob.execute();
+ Assert.assertTrue(canalSyncJob.isInit());
+ Assert.assertTrue(canalSyncJob.isRunning());
+ Assert.assertEquals(JobState.RUNNING, canalSyncJob.getJobState());
+ } catch (UserException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPauseAndResumeJob(@Injectable BinlogDesc binlogDesc) {
+ new MockUp() {
+ @Mock
+ public void startup() {
+ }
+ @Mock
+ public void shutdown(boolean needCleanUp) {
+ }
+ @Mock
+ public void registerChannels(List channels) {
+ }
+ };
+
+ new MockUp() {
+ @Mock
+ public void initChannels() {
+ }
+ };
+
+ new Expectations() {
+ {
+ binlogDesc.getProperties();
+ result = properties;
+ }
+ };
+
+ try {
+ CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+ canalSyncJob.checkAndSetBinlogInfo(binlogDesc);
+ Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+ // run job
+ canalSyncJob.execute();
+ Assert.assertTrue(canalSyncJob.isRunning());
+ // pause job
+ canalSyncJob.pause();
+ Assert.assertTrue(canalSyncJob.isPaused());
+ // resume job
+ canalSyncJob.resume();
+ Assert.assertTrue(canalSyncJob.isRunning());
+ } catch (UserException e) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testCancelJob(@Injectable BinlogDesc binlogDesc) {
+
+ new MockUp() {
+ @Mock
+ public void initChannels() {
+ }
+ };
+
+ new Expectations() {
+ {
+ binlogDesc.getProperties();
+ result = properties;
+ }
+ };
+
+ try {
+ CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+ canalSyncJob.checkAndSetBinlogInfo(binlogDesc);
+ Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+ // run job
+ canalSyncJob.execute();
+ Assert.assertTrue(canalSyncJob.isRunning());
+ // cancel job
+ canalSyncJob.cancel(MsgType.USER_CANCEL, "user cancel");
+ Assert.assertTrue(canalSyncJob.isCancelled());
+ Assert.assertTrue(canalSyncJob.isCompleted());
+ Assert.assertEquals(MsgType.USER_CANCEL, canalSyncJob.getFailMsg().getMsgType());
+ Assert.assertEquals("user cancel", canalSyncJob.getFailMsg().getMsg());
+ } catch (UserException e) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testReplayUpdateState(@Injectable ChannelDescription channelDescription,
+ @Injectable BinlogDesc binlogDesc) {
+ List channelDescriptions = Lists.newArrayList();
+ channelDescriptions.add(channelDescription);
+
+ new Expectations() {
+ {
+ binlogDesc.getProperties();
+ result = properties;
+ channelDescription.getTargetTable();
+ result = tblName;
+ channelDescription.getSrcDatabase();
+ result = "mysqlDb";
+ channelDescription.getSrcTableName();
+ result = "mysqlTbl";
+ channelDescription.getColNames();
+ result = null;
+ channelDescription.getPartitionNames();
+ result = null;
+ }
+ };
+
+ try {
+ CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+ canalSyncJob.setChannelDescriptions(channelDescriptions);
+ canalSyncJob.checkAndSetBinlogInfo(binlogDesc);
+ Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+ SyncJobUpdateStateInfo info = new SyncJobUpdateStateInfo(
+ jobId, JobState.CANCELLED, 1622469769L, -1L, -1L,
+ new SyncFailMsg(MsgType.USER_CANCEL, "user cancel"));
+ canalSyncJob.replayUpdateSyncJobState(info);
+ Assert.assertTrue(canalSyncJob.isCancelled());
+ Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+ } catch (UserException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalTestUtil.java
new file mode 100644
index 00000000000000..d2da579002ad09
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalTestUtil.java
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.canal;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.Message;
+import com.google.common.collect.Lists;
+
+import java.io.Serializable;
+import java.sql.Types;
+import java.util.List;
+
+public class CanalTestUtil {
+
+ public static CanalEntry.Column buildColumn(String colName, int colValue) {
+ CanalEntry.Column.Builder columnBuilder = CanalEntry.Column.newBuilder();
+ Serializable value = Integer.valueOf(colValue);
+ columnBuilder.setName(colName);
+ columnBuilder.setIsKey(true);
+ columnBuilder.setMysqlType("bigint");
+ columnBuilder.setIndex(0);
+ columnBuilder.setIsNull(false);
+ columnBuilder.setValue(value.toString());
+ columnBuilder.setSqlType(Types.BIGINT);
+ columnBuilder.setUpdated(false);
+ return columnBuilder.build();
+ }
+
+ public static CanalEntry.RowChange buildRowChange() {
+ CanalEntry.RowData.Builder rowDataBuilder = CanalEntry.RowData.newBuilder();
+ CanalEntry.RowChange.Builder rowChangeBuider = CanalEntry.RowChange.newBuilder();
+ rowChangeBuider.setIsDdl(false);
+ rowChangeBuider.setEventType(CanalEntry.EventType.INSERT);
+ rowDataBuilder.addAfterColumns(buildColumn("a", 1));
+ rowDataBuilder.addAfterColumns(buildColumn("b", 2));
+ rowChangeBuider.addRowDatas(rowDataBuilder.build());
+ return rowChangeBuider.build();
+ }
+
+ public static CanalEntry.Entry buildEntry(String binlogFile, long offset, long timestamp) {
+ CanalEntry.Header.Builder headerBuilder = CanalEntry.Header.newBuilder();
+ headerBuilder.setLogfileName(binlogFile);
+ headerBuilder.setLogfileOffset(offset);
+ headerBuilder.setExecuteTime(timestamp);
+ CanalEntry.Entry.Builder entryBuilder = CanalEntry.Entry.newBuilder();
+ entryBuilder.setHeader(headerBuilder.build());
+ entryBuilder.setEntryType(CanalEntry.EntryType.ROWDATA);
+ entryBuilder.setStoreValue(buildRowChange().toByteString());
+ return entryBuilder.build();
+ }
+
+ public static CanalEntry.Entry buildEntry(String binlogFile, long offset, long timestamp, String schemaName, String tableName) {
+ CanalEntry.Header.Builder headerBuilder = CanalEntry.Header.newBuilder();
+ headerBuilder.setLogfileName(binlogFile);
+ headerBuilder.setLogfileOffset(offset);
+ headerBuilder.setExecuteTime(timestamp);
+ headerBuilder.setSchemaName(schemaName);
+ headerBuilder.setTableName(tableName);
+ CanalEntry.Entry.Builder entryBuilder = CanalEntry.Entry.newBuilder();
+ entryBuilder.setHeader(headerBuilder.build());
+ entryBuilder.setEntryType(CanalEntry.EntryType.ROWDATA);
+ entryBuilder.setStoreValue(buildRowChange().toByteString());
+ return entryBuilder.build();
+ }
+
+ public static Message fetchEOFMessage() {
+ return new Message(-1L, Lists.newArrayList());
+ }
+
+ public static Message fetchMessage(long id, boolean isRaw, int batchSize, String binlogFile, long offset, String schemaName, String tableName) {
+ List entries = Lists.newArrayList();
+ for (int i = 0 ; i < batchSize; i++) {
+ entries.add(buildEntry(binlogFile, offset++, 1024, schemaName, tableName));
+ }
+ return new Message(id, isRaw, entries);
+ }
+
+
+}
\ No newline at end of file