From bfca616bf974863c8947c0fa71e1664bce8278be Mon Sep 17 00:00:00 2001 From: yujun Date: Tue, 20 Aug 2024 23:01:53 +0800 Subject: [PATCH 1/3] update --- be/src/agent/task_worker_pool.cpp | 17 ++-- be/src/service/doris_main.cpp | 2 + .../org/apache/doris/catalog/DiskInfo.java | 4 + .../java/org/apache/doris/catalog/Tablet.java | 28 +++++-- .../apache/doris/master/ReportHandler.java | 29 ++++++- .../apache/doris/planner/OlapScanNode.java | 14 +++- .../apache/doris/catalog/QueryTabletTest.java | 84 +++++++++++++++++++ .../doris/utframe/MockedBackendFactory.java | 31 +++++++ 8 files changed, 190 insertions(+), 19 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 0b525354da9c12..f28be36a8868c6 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -693,10 +693,6 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() { } _is_doing_work = true; - // Random sleep 1~5 seconds before doing report. - // In order to avoid the problem that the FE receives many report requests at the same time - // and can not be processed. - _random_sleep(5); TReportRequest request; request.__set_backend(BackendOptions::get_local_backend()); @@ -759,9 +755,16 @@ void TaskWorkerPool::_report_tablet_worker_thread_callback() { request.__set_backend(BackendOptions::get_local_backend()); request.__isset.tablets = true; - uint64_t report_version = _s_report_version; - StorageEngine::instance()->tablet_manager()->build_all_report_tablets_info( - &request.tablets); + uint64_t report_version; + for (int i = 0; i < 5; i++) { + request.tablets.clear(); + report_version = _s_report_version; + StorageEngine::instance()->tablet_manager()->build_all_report_tablets_info( + &request.tablets); + if (report_version == s_report_version) { + break; + } + } if (report_version < _s_report_version) { // TODO llj This can only reduce the possibility for report error, but can't avoid it. // If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 19259b5e3da9c2..3b6cf2969cfec6 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -545,6 +545,8 @@ int main(int argc, char** argv) { status.to_string()); } + exec_env->storage_engine()->notify_listeners(); + while (!doris::k_doris_exit) { #if defined(LEAK_SANITIZER) __lsan_do_leak_check(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java index b49acb2ff83cdc..666f6147e835cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java @@ -151,6 +151,10 @@ public boolean hasPathHash() { return pathHash != 0; } + public boolean isAlive() { + return state == DiskState.ONLINE; + } + public boolean isStorageMediumMatch(TStorageMedium storageMedium) { return this.storageMedium == storageMedium; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 8baac8bd71dbff..a7240895029b8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -240,9 +240,11 @@ public Multimap getNormalReplicaBackendPathMap() { } // for query - public List getQueryableReplicas(long visibleVersion, boolean allowFailedVersion) { + public List getQueryableReplicas(long visibleVersion, Map> backendAlivePathHashs, + boolean allowFailedVersion) { List allQueryableReplica = Lists.newArrayListWithCapacity(replicas.size()); List auxiliaryReplica = Lists.newArrayListWithCapacity(replicas.size()); + List deadPathReplica = Lists.newArrayList(); for (Replica replica : replicas) { if (replica.isBad()) { continue; @@ -253,21 +255,31 @@ public List getQueryableReplicas(long visibleVersion, boolean allowFail continue; } + if (!replica.checkVersionCatchUp(visibleVersion, false)) { + continue; + } + + Set thisBeAlivePaths = backendAlivePathHashs.get(replica.getBackendId()); ReplicaState state = replica.getState(); - if (state.canQuery()) { - if (replica.checkVersionCatchUp(visibleVersion, false)) { - allQueryableReplica.add(replica); - } + // if thisBeAlivePaths contains pathHash = 0, it mean this be hadn't report disks state. + // should ignore this case. + if (replica.getPathHash() != -1 && thisBeAlivePaths != null + && !thisBeAlivePaths.contains(replica.getPathHash()) + && !thisBeAlivePaths.contains(0L)) { + deadPathReplica.add(replica); + } else if (state.canQuery()) { + allQueryableReplica.add(replica); } else if (state == ReplicaState.DECOMMISSION) { - if (replica.checkVersionCatchUp(visibleVersion, false)) { - auxiliaryReplica.add(replica); - } + auxiliaryReplica.add(replica); } } if (allQueryableReplica.isEmpty()) { allQueryableReplica = auxiliaryReplica; } + if (allQueryableReplica.isEmpty()) { + allQueryableReplica = deadPathReplica; + } if (Config.skip_compaction_slower_replica && allQueryableReplica.size() > 1) { long minVersionCount = Long.MAX_VALUE; diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 13215503f6643a..125f2553ec935e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.BinlogConfig; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndex.IndexState; @@ -723,6 +724,15 @@ private static void deleteFromMeta(ListMultimap tabletDeleteFromMeta AgentBatchTask createReplicaBatchTask = new AgentBatchTask(); TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); Map objectPool = new HashMap(); + Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); + Set backendHealthPathHashs; + if (backend == null) { + backendHealthPathHashs = Sets.newHashSet(); + } else { + backendHealthPathHashs = backend.getDisks().values().stream() + .filter(DiskInfo::isAlive) + .map(DiskInfo::getPathHash).collect(Collectors.toSet()); + } for (Long dbId : tabletDeleteFromMeta.keySet()) { Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); if (db == null) { @@ -782,7 +792,24 @@ private static void deleteFromMeta(ListMultimap tabletDeleteFromMeta long currentBackendReportVersion = Env.getCurrentSystemInfo() .getBackendReportVersion(backendId); if (backendReportVersion < currentBackendReportVersion) { - continue; + + // if backendHealthPathHashs contains health path hash 0, + // it means this backend hadn't reported disks state, + // should ignore this case. + boolean thisReplicaOnBadDisk = replica.getPathHash() != -1L + && !backendHealthPathHashs.contains(replica.getPathHash()) + && !backendHealthPathHashs.contains(0L); + + boolean existsOtherHealthReplica = tablet.getReplicas().stream() + .anyMatch(r -> r.getBackendId() != replica.getBackendId() + && r.getVersion() >= replica.getVersion() + && r.getLastFailedVersion() == -1L + && !r.isBad()); + + // if replica is on bad disks and there are other health replicas, still delete it. + if (!(thisReplicaOnBadDisk && existsOtherHealthReplica)) { + continue; + } } if (state == ReplicaState.NORMAL || state == ReplicaState.SCHEMA_CHANGE) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 0509eabeb65f90..6342e8bdb50b9e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -41,6 +41,7 @@ import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; @@ -721,7 +722,7 @@ private Collection distributionPrune( } private void addScanRangeLocations(Partition partition, - List tablets) throws UserException { + List tablets, Map> backendAlivePathHashs) throws UserException { long visibleVersion = partition.getVisibleVersion(); String visibleVersionStr = String.valueOf(visibleVersion); @@ -765,7 +766,8 @@ private void addScanRangeLocations(Partition partition, paloRange.setTabletId(tabletId); // random shuffle List && only collect one copy - List replicas = tablet.getQueryableReplicas(visibleVersion, skipMissingVersion); + List replicas = tablet.getQueryableReplicas(visibleVersion, + backendAlivePathHashs, skipMissingVersion); if (replicas.isEmpty()) { LOG.warn("no queryable replica found in tablet {}. visible version {}", tabletId, visibleVersion); StringBuilder sb = new StringBuilder( @@ -1077,6 +1079,12 @@ private void computeTabletInfo() throws UserException { */ Preconditions.checkState(scanBackendIds.size() == 0); Preconditions.checkState(scanTabletIds.size() == 0); + Map> backendAlivePathHashs = Maps.newHashMap(); + for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) { + backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream() + .filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet())); + } + for (Long partitionId : selectedPartitionIds) { final Partition partition = olapTable.getPartition(partitionId); final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId); @@ -1114,7 +1122,7 @@ private void computeTabletInfo() throws UserException { totalTabletsNum += selectedTable.getTablets().size(); selectedTabletsNum += tablets.size(); - addScanRangeLocations(partition, tablets); + addScanRangeLocations(partition, tablets, backendAlivePathHashs); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java new file mode 100644 index 00000000000000..24fb02b232e8f9 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.system.Backend; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class QueryTabletTest extends TestWithFeService { + + @Override + protected int backendNum() { + return 3; + } + + @Test + public void testTabletOnBadDisks() throws Exception { + createDatabase("db1"); + createTable("create table db1.tbl1(k1 int) distributed by hash(k1) buckets 1" + + " properties('replication_num' = '3')"); + + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db1"); + OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1"); + Assertions.assertNotNull(tbl); + Tablet tablet = tbl.getPartitions().iterator().next() + .getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next() + .getTablets().iterator().next(); + + List replicas = tablet.getReplicas(); + Assertions.assertEquals(3, replicas.size()); + for (Replica replica : replicas) { + Assertions.assertTrue(replica.getPathHash() != -1L); + } + + Assertions.assertEquals(replicas, + tablet.getQueryableReplicas(1L, getAlivePathHashs(), false)); + + // disk mark as bad + Env.getCurrentSystemInfo().getBackend(replicas.get(0).getBackendId()) + .getDisks().values().forEach(disk -> disk.setState(DiskInfo.DiskState.OFFLINE)); + + // lost disk + replicas.get(1).setPathHash(-123321L); + + Assertions.assertEquals(Lists.newArrayList(replicas.get(2)), + tablet.getQueryableReplicas(1L, getAlivePathHashs(), false)); + } + + private Map> getAlivePathHashs() { + Map> backendAlivePathHashs = Maps.newHashMap(); + for (Backend backend : Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) { + backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream() + .filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet())); + } + + return backendAlivePathHashs; + } + +} + diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 665dc8163aeef0..8feff517c794e4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -39,6 +39,7 @@ import org.apache.doris.thrift.TCancelPlanFragmentResult; import org.apache.doris.thrift.TCheckStorageFormatResult; import org.apache.doris.thrift.TCloneReq; +import org.apache.doris.thrift.TCreateTabletReq; import org.apache.doris.thrift.TDiskTrashInfo; import org.apache.doris.thrift.TDropTabletReq; import org.apache.doris.thrift.TExecPlanFragmentParams; @@ -81,7 +82,9 @@ import java.io.IOException; import java.util.List; +import java.util.Random; import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; /* * This class is used to create mock backends. @@ -184,6 +187,9 @@ public void run() { TTaskType taskType = request.getTaskType(); switch (taskType) { case CREATE: + ++reportVersion; + handleCreateTablet(request, finishTaskRequest); + break; case ALTER: ++reportVersion; break; @@ -191,6 +197,7 @@ public void run() { handleDropTablet(request, finishTaskRequest); break; case CLONE: + ++reportVersion; handleCloneTablet(request, finishTaskRequest); break; case STORAGE_MEDIUM_MIGRATE: @@ -216,6 +223,30 @@ public void run() { } } + private void handleCreateTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) { + TCreateTabletReq req = request.getCreateTabletReq(); + List candDisks = backendInFe.getDisks().values().stream() + .filter(disk -> req.storage_medium == disk.getStorageMedium() && disk.isAlive()) + .collect(Collectors.toList()); + if (candDisks.isEmpty()) { + candDisks = backendInFe.getDisks().values().stream() + .filter(DiskInfo::isAlive) + .collect(Collectors.toList()); + } + DiskInfo choseDisk = candDisks.isEmpty() ? null + : candDisks.get(new Random().nextInt(candDisks.size())); + + List tabletInfos = Lists.newArrayList(); + TTabletInfo tabletInfo = new TTabletInfo(); + tabletInfo.setTabletId(req.tablet_id); + tabletInfo.setVersion(req.version); + tabletInfo.setPathHash(choseDisk == null ? -1L : choseDisk.getPathHash()); + tabletInfo.setReplicaId(req.replica_id); + tabletInfo.setUsed(true); + tabletInfos.add(tabletInfo); + finishTaskRequest.setFinishTabletInfos(tabletInfos); + } + private void handleDropTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) { TDropTabletReq req = request.getDropTabletReq(); long dataSize = Math.max(1, CatalogTestUtil.getTabletDataSize(req.tablet_id)); From 1cec41dab53ad8b0349654346b31d0d1a90c14b0 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Tue, 20 Aug 2024 23:27:25 +0800 Subject: [PATCH 2/3] update --- .../src/test/java/org/apache/doris/catalog/QueryTabletTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java index 24fb02b232e8f9..b80544245da763 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java @@ -72,7 +72,7 @@ public void testTabletOnBadDisks() throws Exception { private Map> getAlivePathHashs() { Map> backendAlivePathHashs = Maps.newHashMap(); - for (Backend backend : Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) { + for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) { backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream() .filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet())); } From 4e23b79650466165f0e920ea5c519baae8d7e742 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Thu, 22 Aug 2024 12:02:49 +0800 Subject: [PATCH 3/3] udpate --- be/src/agent/task_worker_pool.cpp | 2 +- .../java/org/apache/doris/system/Backend.java | 5 ++ .../org/apache/doris/catalog/TabletTest.java | 13 +++++ .../cluster/DecommissionBackendTest.java | 9 ++- .../doris/utframe/TestWithFeService.java | 56 ++++++++++++------- 5 files changed, 60 insertions(+), 25 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index f28be36a8868c6..35fb7702b98dcd 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -761,7 +761,7 @@ void TaskWorkerPool::_report_tablet_worker_thread_callback() { report_version = _s_report_version; StorageEngine::instance()->tablet_manager()->build_all_report_tablets_info( &request.tablets); - if (report_version == s_report_version) { + if (report_version == _s_report_version) { break; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index 99cd99dca06107..c41a70d60aea4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -172,6 +172,11 @@ public long getId() { return id; } + // Return ip:heartbeat port + public String getAddress() { + return host + ":" + heartbeatPort; + } + public String getHost() { return host; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java index d7fdb2694a8282..99769a6b525058 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java @@ -20,6 +20,8 @@ import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.Sets; @@ -42,6 +44,7 @@ public class TabletTest { private Replica replica3; private TabletInvertedIndex invertedIndex; + private SystemInfoService infoService; @Mocked private Env env; @@ -49,6 +52,12 @@ public class TabletTest { @Before public void makeTablet() { invertedIndex = new TabletInvertedIndex(); + infoService = new SystemInfoService(); + for (long beId = 1L; beId <= 4L; beId++) { + Backend be = new Backend(beId, "127.0.0." + beId, 8030); + be.setAlive(true); + infoService.addBackend(be); + } new Expectations(env) { { Env.getCurrentEnvJournalVersion(); @@ -59,6 +68,10 @@ public void makeTablet() { minTimes = 0; result = invertedIndex; + Env.getCurrentSystemInfo(); + minTimes = 0; + result = infoService; + Env.isCheckpointThread(); minTimes = 0; result = false; diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index ff5f5292be8926..e67cbccb668afa 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -97,8 +97,8 @@ public void testDecommissionBackend() throws Exception { } } - Assertions.assertTrue(srcBackend != null); - String decommissionStmtStr = "alter system decommission backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\""; + Assertions.assertNotNull(srcBackend); + String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); @@ -163,7 +163,7 @@ public void testDecommissionBackendWithDropTable() throws Exception { dropTable("db2.tbl1", false); // 6. execute decommission - String decommissionStmtStr = "alter system decommission backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\""; + String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); Assertions.assertEquals(true, srcBackend.isDecommissioned()); @@ -240,8 +240,7 @@ public void testDecommissionBackendWithColocateGroup() throws Exception { // 4. query tablet num int tabletNum = Env.getCurrentInvertedIndex().getTabletMetaMap().size(); - String decommissionStmtStr = "alter system decommission backend \"127.0.0.1:" - + srcBackend.getHeartbeatPort() + "\""; + String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 59e4eae0a3729d..db77fcd07ecfd5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -386,7 +386,7 @@ protected int startFEServerWithoutRetry(String runningDir) protected void createDorisCluster() throws InterruptedException, NotInitException, IOException, DdlException, EnvVarNotSetException, FeStartException { - createDorisCluster(runningDir, backendNum()); + createDorisClusterWithMultiTag(runningDir, backendNum()); } protected void createDorisCluster(String runningDir, int backendNum) @@ -399,26 +399,13 @@ protected void createDorisCluster(String runningDir, int backendNum) bes.add(createBackend("127.0.0.1", feRpcPort)); } System.out.println("after create backend"); - checkBEHeartbeat(bes); + if (!checkBEHeartbeat(bes)) { + System.out.println("Some backends dead, all backends: " + bes); + } // Thread.sleep(2000); System.out.println("after create backend2"); } - private void checkBEHeartbeat(List bes) throws InterruptedException { - int maxTry = Config.heartbeat_interval_second + 5; - boolean allAlive = false; - while (maxTry-- > 0 && !allAlive) { - Thread.sleep(1000); - boolean hasDead = false; - for (Backend be : bes) { - if (!be.isAlive()) { - hasDead = true; - } - } - allAlive = !hasDead; - } - } - // Create multi backends with different host for unit test. // the host of BE will be "127.0.0.1", "127.0.0.2" protected void createDorisClusterWithMultiTag(String runningDir, int backendNum) @@ -426,14 +413,45 @@ protected void createDorisClusterWithMultiTag(String runningDir, int backendNum) InterruptedException { // set runningUnitTest to true, so that for ut, the agent task will be send to "127.0.0.1" // to make cluster running well. - FeConstants.runningUnitTest = true; + if (backendNum > 1) { + FeConstants.runningUnitTest = true; + } int feRpcPort = startFEServer(runningDir); List bes = Lists.newArrayList(); + System.out.println("start create backend, backend num " + backendNum); for (int i = 0; i < backendNum; i++) { String host = "127.0.0." + (i + 1); bes.add(createBackend(host, feRpcPort)); } - checkBEHeartbeat(bes); + System.out.println("after create backend"); + if (!checkBEHeartbeat(bes)) { + System.out.println("Some backends dead, all backends: " + bes); + } + System.out.println("after create backend2"); + } + + protected boolean checkBEHeartbeat(List bes) { + return checkBEHeartbeatStatus(bes, true); + } + + protected boolean checkBELostHeartbeat(List bes) { + return checkBEHeartbeatStatus(bes, false); + } + + private boolean checkBEHeartbeatStatus(List bes, boolean isAlive) { + int maxTry = Config.heartbeat_interval_second + 2; + while (maxTry-- > 0) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // no exception + } + if (bes.stream().allMatch(be -> be.isAlive() == isAlive)) { + return true; + } + } + + return false; } protected Backend addNewBackend() throws IOException, InterruptedException {