From 82a72c14c67224e1229f0bf866f8ced71bb62059 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 26 Nov 2018 16:45:28 +0800 Subject: [PATCH 1/2] Optimize the publish logic of streaming load 1. Only collect all error replicas if publish task is timeout. 2. Add 2 metrics to monitor the success of failure of txn. --- be/src/http/action/stream_load.cpp | 1 + be/test/olap/file_helper_test.cpp | 4 +- .../org/apache/doris/catalog/Replica.java | 33 ++++++- .../org/apache/doris/master/MasterImpl.java | 4 +- .../org/apache/doris/metric/MetricRepo.java | 15 ++- .../doris/service/FrontendServiceImpl.java | 3 +- .../apache/doris/task/PublishVersionTask.java | 10 +- .../transaction/GlobalTransactionMgr.java | 52 ++++++----- .../transaction/PublishVersionDaemon.java | 92 ++++++++++--------- .../doris/transaction/TransactionState.java | 16 ++++ 10 files changed, 148 insertions(+), 82 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index c608de08bc1465..a2902daa76a04b 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -330,6 +330,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { } auto str = ctx->to_json(); HttpChannel::send_reply(req, str); + k_streaming_load_current_processing.increment(-1); return -1; } return 0; diff --git a/be/test/olap/file_helper_test.cpp b/be/test/olap/file_helper_test.cpp index ed21e001bb27e2..90d8e46aceca87 100644 --- a/be/test/olap/file_helper_test.cpp +++ b/be/test/olap/file_helper_test.cpp @@ -91,10 +91,10 @@ TEST_F(FileHandlerTest, TestWrite) { ASSERT_EQ(22, length); - char* large_bytes2[(1 << 12)]; + char* large_bytes2[(1 << 10)]; memset(large_bytes2, 0, sizeof(char)*((1 << 12))); int i = 1; - while (i < 1 << 20) { + while (i < 1 << 17) { file_handler.write(large_bytes2, ((1 << 12))); ++i; } diff --git a/fe/src/main/java/org/apache/doris/catalog/Replica.java b/fe/src/main/java/org/apache/doris/catalog/Replica.java index f60460572fb78b..a51f199577601c 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/src/main/java/org/apache/doris/catalog/Replica.java @@ -20,6 +20,7 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -178,6 +179,25 @@ public synchronized void updateVersionInfo(long newVersion, long newVersionHash, lastSuccessVersion, lastSuccessVersionHash, dataSize, rowCount); } + /* last failed version: LFV + * last success version: LSV + * version: V + * + * Case 1: + * If LFV > LSV, set LSV back to V, which indicates that version between LSV and LFV is invalid. + * Clone task will clone the version between LSV and LFV + * + * Case 2: + * LFV changed, set LSV back to V. This is just same as Case 1. Cause LFV must large than LSV. + * + * Case 3: + * LFV remains unchanged, just update LSV, and then check if it falls into Case 1. + * + * Case 4: + * V is larger or equal to LFV, reset LFV. And if V is less than LSV, just set V to LSV. This may + * happen when a clone task finished and report version V, but the LSV is already larger than V, + * And we know that version between V and LSV is valid, so move V forward to LSV. + */ private void updateReplicaInfo(long newVersion, long newVersionHash, long lastFailedVersion, long lastFailedVersionHash, long lastSuccessVersion, long lastSuccessVersionHash, @@ -196,11 +216,14 @@ private void updateReplicaInfo(long newVersion, long newVersionHash, lastSuccessVersion = this.version; lastSuccessVersionHash = this.versionHash; } + + // case 1: if (this.lastSuccessVersion <= this.lastFailedVersion) { this.lastSuccessVersion = this.version; this.lastSuccessVersionHash = this.versionHash; } + // TODO: this case is unknown, add log to observe if (this.version > lastFailedVersion && lastFailedVersion > 0) { LOG.info("current version {} is larger than last failed version {} , " + "last failed version hash {}, maybe a fatal error or be report version, print a stack here ", @@ -209,15 +232,17 @@ private void updateReplicaInfo(long newVersion, long newVersionHash, if (lastFailedVersion != this.lastFailedVersion || this.lastFailedVersionHash != lastFailedVersionHash) { - // if last failed version changed, then set last success version to invalid version + // Case 2: if (lastFailedVersion > this.lastFailedVersion) { this.lastFailedVersion = lastFailedVersion; this.lastFailedVersionHash = lastFailedVersionHash; this.lastFailedTimestamp = System.currentTimeMillis(); } + this.lastSuccessVersion = this.version; this.lastSuccessVersionHash = this.versionHash; } else { + // Case 3: if (lastSuccessVersion >= this.lastSuccessVersion) { this.lastSuccessVersion = lastSuccessVersion; this.lastSuccessVersionHash = lastSuccessVersionHash; @@ -228,9 +253,7 @@ private void updateReplicaInfo(long newVersion, long newVersionHash, } } - // if last failed version <= version, then last failed version is invalid - // version xxxx | last failed version xxxx | last success version xxx - // if current version == last failed version and version hash != last failed version hash, it means the version report from be is not valid + // Case 4: if (this.version > this.lastFailedVersion || this.version == this.lastFailedVersion && this.versionHash == this.lastFailedVersionHash || this.version == this.lastFailedVersion && this.lastFailedVersionHash == 0 && this.versionHash != 0) { @@ -242,7 +265,7 @@ private void updateReplicaInfo(long newVersion, long newVersionHash, this.versionHash = this.lastSuccessVersionHash; } } - // TODO yiguolei use info log here, there maybe a lot of logs, change it to debug when concurrent load is stable + LOG.debug("update {}", this.toString()); } diff --git a/fe/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/src/main/java/org/apache/doris/master/MasterImpl.java index 3a9b7ac4dd781f..b96d4a0397b2f9 100644 --- a/fe/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/src/main/java/org/apache/doris/master/MasterImpl.java @@ -46,9 +46,9 @@ import org.apache.doris.task.CloneTask; import org.apache.doris.task.CreateReplicaTask; import org.apache.doris.task.CreateRollupTask; -import org.apache.doris.task.PublishVersionTask; import org.apache.doris.task.DirMoveTask; import org.apache.doris.task.DownloadTask; +import org.apache.doris.task.PublishVersionTask; import org.apache.doris.task.PushTask; import org.apache.doris.task.SchemaChangeTask; import org.apache.doris.task.SnapshotTask; @@ -555,7 +555,7 @@ private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) { if (request.isSetError_tablet_ids()) { errorTabletIds = request.getError_tablet_ids(); } - PublishVersionTask publishVersionTask = (PublishVersionTask)task; + PublishVersionTask publishVersionTask = (PublishVersionTask) task; publishVersionTask.addErrorTablets(errorTabletIds); publishVersionTask.setIsFinished(true); AgentTaskQueue.removeTask(publishVersionTask.getBackendId(), diff --git a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java index 0b98e108471d83..111c4df89e41fb 100644 --- a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -17,9 +17,6 @@ package org.apache.doris.metric; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; - import org.apache.doris.alter.Alter; import org.apache.doris.alter.AlterJob.JobType; import org.apache.doris.catalog.Catalog; @@ -33,6 +30,10 @@ import org.apache.doris.service.ExecuteEnv; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -57,6 +58,8 @@ public final class MetricRepo { public static LongCounterMetric COUNTER_EDIT_LOG_READ; public static LongCounterMetric COUNTER_IMAGE_WRITE; public static LongCounterMetric COUNTER_IMAGE_PUSH; + public static LongCounterMetric COUNTER_TXN_FAILED; + public static LongCounterMetric COUNTER_TXN_SUCCESS; public static Histogram HISTO_QUERY_LATENCY; public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY; @@ -161,6 +164,12 @@ public Long getValue() { COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push", "counter of image succeeded in pushing to other frontends"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH); + COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_success", + "counter of success transactions"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_SUCCESS); + COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed", + "counter of failed transactions"); + PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_FAILED); // 3. histogram HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("query", "latency", "ms")); diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ffce942bc97bdd..9c6d2668c0be16 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -634,7 +634,7 @@ public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws return result; } - // return true if commit success and publish success, return false if publish timout + // return true if commit success and publish success, return false if publish timeout private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserException { String cluster = request.getCluster(); if (Strings.isNullOrEmpty(cluster)) { @@ -655,6 +655,7 @@ private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserExce } throw new UserException("unknown database, database=" + dbName); } + return Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( db, request.getTxnId(), TabletCommitInfo.fromThrift(request.getCommitInfos()), diff --git a/fe/src/main/java/org/apache/doris/task/PublishVersionTask.java b/fe/src/main/java/org/apache/doris/task/PublishVersionTask.java index 97e31a6d790e3b..c65cfe05a96409 100644 --- a/fe/src/main/java/org/apache/doris/task/PublishVersionTask.java +++ b/fe/src/main/java/org/apache/doris/task/PublishVersionTask.java @@ -17,15 +17,15 @@ package org.apache.doris.task; -import java.util.ArrayList; -import java.util.List; +import org.apache.doris.thrift.TPartitionVersionInfo; +import org.apache.doris.thrift.TPublishVersionRequest; +import org.apache.doris.thrift.TTaskType; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.doris.thrift.TPartitionVersionInfo; -import org.apache.doris.thrift.TPublishVersionRequest; -import org.apache.doris.thrift.TTaskType; +import java.util.ArrayList; +import java.util.List; public class PublishVersionTask extends AgentTask { private static final Logger LOG = LogManager.getLogger(PublishVersionTask.class); diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index d3da6a0b8c0afd..35bd20c897e23d 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -17,12 +17,6 @@ package org.apache.doris.transaction; -import com.google.common.base.Preconditions; -import com.google.common.collect.HashBasedTable; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - import org.apache.doris.alter.RollupJob; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -49,6 +43,13 @@ import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TTaskType; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -82,6 +83,7 @@ public class GlobalTransactionMgr { // transactionId -> TransactionState private Map idToTransactionState; + // db id -> (label -> txn id) private com.google.common.collect.Table dbIdToTxnLabels; private Map runningTxnNums; private TransactionIdGenerator idGenerator; @@ -107,7 +109,7 @@ public long beginTransaction(long dbId, String label, String coordinator, LoadJo throws AnalysisException, LabelAlreadyExistsException, BeginTransactionException { if (Config.disable_load_job) { - throw new BeginTransactionException("disable_load_job is set to true, all load job is prevented"); + throw new BeginTransactionException("disable_load_job is set to true, all load jobs are prevented"); } writeLock(); @@ -185,12 +187,11 @@ public void deleteTransaction(long transactionId) { */ public void commitTransaction(long dbId, long transactionId, List tabletCommitInfos) throws MetaNotFoundException, TransactionCommitFailedException { - if (Config.disable_load_job) { - throw new TransactionCommitFailedException("disable_load_job is set to true, all load job is prevented"); + throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented"); } - LOG.debug("try to commit transaction:[{}]", transactionId); + LOG.debug("try to commit transaction: {}", transactionId); if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) { throw new TransactionCommitFailedException("all partitions have no load data"); } @@ -260,7 +261,8 @@ public void commitTransaction(long dbId, long transactionId, List allBackends = Catalog.getCurrentSystemInfo().getBackendIds(false); @@ -97,13 +98,14 @@ private void publishVersion() { } } Set publishBackends = transactionState.getPublishVersionTasks().keySet(); + // public version tasks are not persisted in catalog, so publishBackends may be empty. + // so we have to try publish to all backends; if (publishBackends.isEmpty()) { // could not just add to it, should new a new object, or the back map will destroyed publishBackends = Sets.newHashSet(); - // this is useful if fe master transfer to another master, because publish version task is not - // persistent to edit log, then it should publish to all backends publishBackends.addAll(allBackends); } + for (long backendId : publishBackends) { PublishVersionTask task = new PublishVersionTask(backendId, transactionState.getTransactionId(), @@ -130,6 +132,7 @@ private void publishVersion() { } Map transTasks = transactionState.getPublishVersionTasks(); Set transErrorReplicas = Sets.newHashSet(); + List unfinishedTasks = Lists.newArrayList(); for (PublishVersionTask publishVersionTask : transTasks.values()) { if (publishVersionTask.isFinished()) { // sometimes backend finish publish version task, but it maybe failed to change transactionid to version for some tablets @@ -145,44 +148,48 @@ private void publishVersion() { } } } else { - // if task is not finished in time, then set all replica in the backend to error state - List versionInfos = publishVersionTask.getPartitionVersionInfos(); - Set errorPartitionIds = Sets.newHashSet(); - for (TPartitionVersionInfo versionInfo : versionInfos) { - errorPartitionIds.add(versionInfo.getPartition_id()); - } - if (errorPartitionIds.isEmpty()) { - continue; - } - List tabletIds = tabletInvertedIndex.getTabletIdsByBackendId(publishVersionTask.getBackendId()); - for (long tabletId : tabletIds) { - long partitionId = tabletInvertedIndex.getPartitionId(tabletId); - if (errorPartitionIds.contains(partitionId)) { - Replica replica = tabletInvertedIndex.getReplica(tabletId, publishVersionTask.getBackendId()); - transErrorReplicas.add(replica); + unfinishedTasks.add(publishVersionTask); + } + } + + boolean shouldFinishTxn = false; + if (!unfinishedTasks.isEmpty()) { + if (transactionState.isPublishTimeout()) { + // transaction's publish is timeout, but there still has unfinished tasks. + // we need to collect all error replicas, and try to finish this txn. + for (PublishVersionTask unfinishedTask : unfinishedTasks) { + // set all replica in the backend to error state + List versionInfos = unfinishedTask.getPartitionVersionInfos(); + Set errorPartitionIds = Sets.newHashSet(); + for (TPartitionVersionInfo versionInfo : versionInfos) { + errorPartitionIds.add(versionInfo.getPartition_id()); + } + if (errorPartitionIds.isEmpty()) { + continue; + } + + // TODO(cmy): this is inefficient, but just keep it simple. will change it later. + List tabletIds = tabletInvertedIndex.getTabletIdsByBackendId(unfinishedTask.getBackendId()); + for (long tabletId : tabletIds) { + long partitionId = tabletInvertedIndex.getPartitionId(tabletId); + if (errorPartitionIds.contains(partitionId)) { + Replica replica = tabletInvertedIndex.getReplica(tabletId, + unfinishedTask.getBackendId()); + transErrorReplicas.add(replica); + } } } + + shouldFinishTxn = true; } + // transaction's publish is not timeout, waiting next round. + } else { + // all publish tasks are finished, try to finish this txn. + shouldFinishTxn = true; } - // the timeout value is related with backend num - long timeoutMillis = Math.min(Config.publish_version_timeout_second * transTasks.size() * 1000, 10000); - // the minimal internal should be 3s - timeoutMillis = Math.max(timeoutMillis, 3000); - // should not wait clone replica or replica's that with last failed version > 0 - // if wait for them, the publish process will be very slow - int normalReplicasNotRespond = 0; - Set allErrorReplicas = Sets.newHashSet(); - for (Replica replica : transErrorReplicas) { - allErrorReplicas.add(replica.getId()); - if (replica.getState() != ReplicaState.CLONE - && replica.getLastFailedVersion() < 1) { - ++normalReplicasNotRespond; - } - } - if (normalReplicasNotRespond == 0 - || System.currentTimeMillis() - transactionState.getPublishVersionTime() > timeoutMillis) { - LOG.debug("transTask num {}, error replica id num {}", transTasks.size(), transErrorReplicas.size()); + if (shouldFinishTxn) { + Set allErrorReplicas = transErrorReplicas.stream().map(v -> v.getId()).collect(Collectors.toSet()); globalTransactionMgr.finishTransaction(transactionState.getTransactionId(), allErrorReplicas); if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) { // if finish transaction state failed, then update publish version time, should check @@ -192,11 +199,12 @@ private void publishVersion() { transactionState, transErrorReplicas.size()); } } + if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); } } - } + } // end for readyTransactionStates } } diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index f876349011d1dc..b79505665ae2e5 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -17,8 +17,10 @@ package org.apache.doris.transaction; +import org.apache.doris.common.Config; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.task.PublishVersionTask; import com.google.common.collect.Maps; @@ -253,6 +255,13 @@ public void setTransactionStatus(TransactionStatus transactionStatus) { this.transactionStatus = transactionStatus; if (transactionStatus == TransactionStatus.VISIBLE) { this.latch.countDown(); + if (MetricRepo.isInit.get()) { + MetricRepo.COUNTER_TXN_SUCCESS.increase(1L); + } + } else if (transactionStatus == TransactionStatus.ABORTED) { + if (MetricRepo.isInit.get()) { + MetricRepo.COUNTER_TXN_FAILED.increase(1L); + } } } @@ -321,4 +330,11 @@ public LoadJobSourceType getSourceType() { public Map getPublishVersionTasks() { return publishVersionTasks; } + + public boolean isPublishTimeout() { + // timeout is between 3 to 10 seconds. + long timeoutMillis = Math.min(Config.publish_version_timeout_second * publishVersionTasks.size() * 1000, 10000); + timeoutMillis = Math.max(timeoutMillis, 3000); + return System.currentTimeMillis() - publishVersionTime > timeoutMillis; + } } From 7ba5fdd44a5b2cead74f0ed96ed0321e50f001b2 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 26 Nov 2018 18:54:58 +0800 Subject: [PATCH 2/2] Change publish timeout to Config.load_straggler_wait_second --- .../java/org/apache/doris/common/Config.java | 3 ++ .../doris/transaction/TransactionState.java | 5 +-- .../routineload/RoutineLoadSchedulerTest.java | 34 +++++-------------- 3 files changed, 14 insertions(+), 28 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 75ac8579d64257..fa13718a06c3fe 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -241,6 +241,9 @@ public class Config extends ConfigBase { * if (current_time - t1) > 300s, then palo will treat C as a failure node * will call transaction manager to commit the transaction and tell transaction manager * that C is failed + * + * This is also used when waiting for publish tasks + * * TODO this parameter is the default value for all job and the DBA could specify it for separate job */ @ConfField public static int load_straggler_wait_second = 300; diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index b79505665ae2e5..a5dda349560472 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -332,8 +332,9 @@ public Map getPublishVersionTasks() { } public boolean isPublishTimeout() { - // timeout is between 3 to 10 seconds. - long timeoutMillis = Math.min(Config.publish_version_timeout_second * publishVersionTasks.size() * 1000, 10000); + // timeout is between 3 to Config.max_txn_publish_waiting_time_ms seconds. + long timeoutMillis = Math.min(Config.publish_version_timeout_second * publishVersionTasks.size() * 1000, + Config.load_straggler_wait_second * 1000); timeoutMillis = Math.max(timeoutMillis, 3000); return System.currentTimeMillis() - publishVersionTime > timeoutMillis; } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index 8f6e22ecde8a6f..f5f862bba6873a 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -17,32 +17,26 @@ package org.apache.doris.load.routineload; -import com.google.common.collect.Lists; -import mockit.Deencapsulation; -import mockit.Expectations; -import mockit.Injectable; -import mockit.Mock; -import mockit.MockUp; -import mockit.Mocked; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.persist.EditLog; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TResourceInfo; -import org.easymock.EasyMock; + +import com.google.common.collect.Lists; + import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; +import mockit.Deencapsulation; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; + public class RoutineLoadSchedulerTest { @Test @@ -72,18 +66,6 @@ public void testNormalRunOneCycle(@Mocked Catalog catalog, Deencapsulation.setField(routineLoadJob, "kafkaPartitions", partitions); Deencapsulation.setField(routineLoadJob, "desireTaskConcurrentNum", 3); - new MockUp() { - @Mock - public SystemInfoService getCurrentSystemInfo() { - return systemInfoService; - } - - @Mock - public Catalog getCurrentCatalog() { - return catalog; - } - }; - new Expectations() { { catalog.getRoutineLoadInstance();